From b39e1078e9cdad3ffdcf7f83262804674120f39b Mon Sep 17 00:00:00 2001 From: Manuel Rubio Date: Fri, 6 Nov 2020 01:41:31 +0100 Subject: [PATCH 1/7] parallel sent of logs to graylog --- lib/gelf_logger.ex | 166 ++------------------------------- lib/gelf_logger/application.ex | 14 +++ lib/gelf_logger/worker.ex | 160 +++++++++++++++++++++++++++++++ mix.exs | 16 ++-- mix.lock | 10 +- 5 files changed, 196 insertions(+), 170 deletions(-) create mode 100644 lib/gelf_logger/application.ex create mode 100644 lib/gelf_logger/worker.ex diff --git a/lib/gelf_logger.ex b/lib/gelf_logger.ex index 53b1ba4..9add59f 100644 --- a/lib/gelf_logger.ex +++ b/lib/gelf_logger.ex @@ -77,11 +77,6 @@ defmodule Logger.Backends.Gelf do [protofy/erl_graylog_sender](https://github.com/protofy/erl_graylog_sender). """ - @max_size 1_047_040 - @max_packet_size 8192 - @max_payload_size 8180 - @epoch :calendar.datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}) - @behaviour :gen_event def init({__MODULE__, name}) do @@ -103,7 +98,7 @@ defmodule Logger.Backends.Gelf do def handle_event({level, _gl, {Logger, msg, ts, md}}, %{level: min_level} = state) do if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do - log_event(level, msg, ts, md, state) + GelfLogger.Worker.start_child(level, msg, ts, md, state) end {:ok, state} @@ -182,14 +177,11 @@ defmodule Logger.Backends.Gelf do end port = - cond do - is_binary(port) -> - {val, ""} = Integer.parse(to_string(port)) - - val - - true -> - port + if is_binary(port) do + {val, ""} = Integer.parse(to_string(port)) + val + else + port end %{ @@ -207,150 +199,4 @@ defmodule Logger.Backends.Gelf do format: format } end - - defp log_event(level, msg, ts, md, state) do - {level, msg, ts, md} = format(level, msg, ts, md, state[:format]) - - int_level = - case level do - :debug -> 7 - :info -> 6 - :warn -> 4 - :error -> 3 - end - - fields = - md - |> take_metadata(state[:metadata]) - |> Keyword.merge(state[:tags]) - |> Map.new(fn {k, v} -> - if is_list(v) or String.Chars.impl_for(v) == nil do - {"_#{k}", inspect(v)} - else - {"_#{k}", to_string(v)} - end - end) - - {{year, month, day}, {hour, min, sec, milli}} = ts - - epoch_seconds = - :calendar.datetime_to_gregorian_seconds({{year, month, day}, {hour, min, sec}}) - @epoch - - {timestamp, _remainder} = "#{epoch_seconds}.#{milli}" |> Float.parse() - - msg_formatted = - if is_tuple(state[:format]), do: msg, else: format_event(level, msg, ts, md, state) - - gelf = - %{ - short_message: String.slice(to_string(msg_formatted), 0..79), - full_message: to_string(msg_formatted), - version: "1.1", - host: state[:host], - level: int_level, - timestamp: Float.round(timestamp, 3), - _application: state[:application] - } - |> Map.merge(fields) - - data = encode(gelf, state[:encoder]) |> compress(state[:compression]) - - size = byte_size(data) - - cond do - to_string(msg_formatted) == "" -> - # Skip empty messages - :ok - - size > @max_size -> - raise ArgumentError, message: "Message too large" - - size > @max_packet_size -> - num = div(size, @max_packet_size) - - num = - if num * @max_packet_size < size do - num + 1 - else - num - end - - id = :crypto.strong_rand_bytes(8) - - send_chunks( - state[:socket], - state[:gl_host], - state[:port], - data, - id, - :binary.encode_unsigned(num), - 0, - size - ) - - true -> - :gen_udp.send(state[:socket], state[:gl_host], state[:port], data) - end - end - - defp format(level, message, timestamp, metadata, {module, function}) do - apply(module, function, [level, message, timestamp, metadata]) - end - - defp format(level, message, timestamp, metadata, _), - do: {level, message, timestamp, metadata} - - defp send_chunks(socket, host, port, data, id, num, seq, size) when size > @max_payload_size do - <> = data - - :gen_udp.send(socket, host, port, make_chunk(payload, id, num, seq)) - - send_chunks(socket, host, port, rest, id, num, seq + 1, byte_size(rest)) - end - - defp send_chunks(socket, host, port, data, id, num, seq, _size) do - :gen_udp.send(socket, host, port, make_chunk(data, id, num, seq)) - end - - defp make_chunk(payload, id, num, seq) do - bin = :binary.encode_unsigned(seq) - - <<0x1E, 0x0F, id::binary-size(8), bin::binary-size(1), num::binary-size(1), payload::binary>> - end - - defp encode(data, encoder) do - :erlang.apply(encoder, :encode!, [data]) - end - - defp compress(data, type) do - case type do - :gzip -> - :zlib.gzip(data) - - :zlib -> - :zlib.compress(data) - - _ -> - data - end - end - - # Ported from Logger.Backends.Console - defp format_event(level, msg, ts, md, %{format: format, metadata: keys}) do - Logger.Formatter.format(format, level, msg, ts, take_metadata(md, keys)) - end - - # Ported from Logger.Backends.Console - defp take_metadata(metadata, :all) do - Keyword.drop(metadata, [:crash_reason, :ancestors, :callers]) - end - - defp take_metadata(metadata, keys) do - Enum.reduce(keys, [], fn key, acc -> - case Keyword.fetch(metadata, key) do - {:ok, val} -> [{key, val} | acc] - :error -> acc - end - end) - end end diff --git a/lib/gelf_logger/application.ex b/lib/gelf_logger/application.ex new file mode 100644 index 0000000..9e27dd5 --- /dev/null +++ b/lib/gelf_logger/application.ex @@ -0,0 +1,14 @@ +defmodule GelfLogger.Application do + @moduledoc false + use Application + + @impl Application + def start(_type, _args) do + children = [ + {Task.Supervisor, name: GelfLogger.Pool} + ] + + options = [strategy: :one_for_one, name: GelfLogger.Supervisor] + Supervisor.start_link(children, options) + end +end diff --git a/lib/gelf_logger/worker.ex b/lib/gelf_logger/worker.ex new file mode 100644 index 0000000..928e7a8 --- /dev/null +++ b/lib/gelf_logger/worker.ex @@ -0,0 +1,160 @@ +defmodule GelfLogger.Worker do + @supervisor GelfLogger.Pool + @max_size 1_047_040 + @max_packet_size 8192 + @max_payload_size 8180 + + def start_child(level, msg, ts, md, state) do + args = [level, msg, ts, md, state] + opts = [restart: :transient] + Task.Supervisor.start_child(@supervisor, __MODULE__, :run, [args], opts) + end + + def run([level, msg, ts, md, state]) do + {level, msg, ts, md} = format(level, msg, ts, md, state[:format]) + + int_level = + case level do + :debug -> 7 + :info -> 6 + :notice -> 5 + :warn -> 4 + :error -> 3 + :critical -> 2 + :alert -> 1 + :emergency -> 0 + end + + fields = + md + |> take_metadata(state[:metadata]) + |> Keyword.merge(state[:tags]) + |> Map.new(fn {k, v} -> + if is_list(v) or String.Chars.impl_for(v) == nil do + {"_#{k}", inspect(v)} + else + {"_#{k}", to_string(v)} + end + end) + + {{year, month, day}, {hour, min, sec, milli}} = ts + + epoch_milliseconds = + {{year, month, day}, {hour, min, sec}} + |> NaiveDateTime.from_erl!({milli, 0}) + |> DateTime.from_naive!("Etc/UTC") + |> DateTime.to_unix(:millisecond) + + timestamp = Float.round(epoch_milliseconds / 1_000, 3) + + msg_formatted = + if(is_tuple(state[:format]), do: msg, else: format_event(level, msg, ts, md, state)) + |> to_string() + + gelf = + %{ + short_message: String.slice(msg_formatted, 0..79), + full_message: msg_formatted, + version: "1.1", + host: state[:host], + level: int_level, + timestamp: timestamp, + _application: state[:application] + } + |> Map.merge(fields) + + data = + gelf + |> encode(state[:encoder]) + |> compress(state[:compression]) + + size = byte_size(data) + + cond do + msg_formatted == "" -> + # Skip empty messages + :ok + + size > @max_size -> + raise ArgumentError, message: "Message too large" + + size > @max_packet_size -> + num = div(size, @max_packet_size) + + num = + if num * @max_packet_size < size do + num + 1 + else + num + end + + id = :crypto.strong_rand_bytes(8) + + send_chunks( + state[:socket], + state[:gl_host], + state[:port], + data, + id, + :binary.encode_unsigned(num), + 0, + size + ) + + true -> + :gen_udp.send(state[:socket], state[:gl_host], state[:port], data) + end + end + + defp format(level, message, timestamp, metadata, {module, function}) do + apply(module, function, [level, message, timestamp, metadata]) + end + + defp format(level, message, timestamp, metadata, _), + do: {level, message, timestamp, metadata} + + defp send_chunks(socket, host, port, data, id, num, seq, size) when size > @max_payload_size do + <> = data + + :gen_udp.send(socket, host, port, make_chunk(payload, id, num, seq)) + + send_chunks(socket, host, port, rest, id, num, seq + 1, byte_size(rest)) + end + + defp send_chunks(socket, host, port, data, id, num, seq, _size) do + :gen_udp.send(socket, host, port, make_chunk(data, id, num, seq)) + end + + defp make_chunk(payload, id, num, seq) do + bin = :binary.encode_unsigned(seq) + + <<0x1E, 0x0F, id::binary-size(8), bin::binary-size(1), num::binary-size(1), payload::binary>> + end + + defp encode(data, encoder) do + :erlang.apply(encoder, :encode!, [data]) + end + + defp compress(data, :gzip), do: :zlib.gzip(data) + defp compress(data, :zlib), do: :zlib.compress(data) + defp compress(data, _), do: data + + # Ported from Logger.Backends.Console + defp format_event(level, msg, ts, md, %{format: format, metadata: keys}) do + Logger.Formatter.format(format, level, msg, ts, take_metadata(md, keys)) + end + + # Ported from Logger.Backends.Console + defp take_metadata(metadata, :all) do + Keyword.drop(metadata, [:crash_reason, :ancestors, :callers]) + end + + defp take_metadata(metadata, keys) do + Enum.reduce(keys, [], fn key, acc -> + case Keyword.fetch(metadata, key) do + {:ok, val} -> [{key, val} | acc] + :error -> acc + end + end) + end +end diff --git a/mix.exs b/mix.exs index c55f741..e342b4b 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule GelfLogger.Mixfile do def project do [app: :gelf_logger, version: "0.10.0", - elixir: "~> 1.2", + elixir: "~> 1.8", build_embedded: Mix.env == :prod, start_permanent: Mix.env == :prod, elixirc_paths: elixirc_paths(Mix.env()), @@ -14,7 +14,7 @@ defmodule GelfLogger.Mixfile do # Docs name: "GELF Logger", - source_url: "https://github.com/jschniper/gelf_logger", + source_url: "https://github.com/manuel-rubio/gelf_logger", docs: [ main: "Logger.Backends.Gelf", extras: ["README.md"] @@ -29,14 +29,17 @@ defmodule GelfLogger.Mixfile do # # Type "mix help compile.app" for more information def application do - [applications: [:logger]] + [ + extra_applications: [:logger], + mod: {GelfLogger.Application, []} + ] end defp deps do [ - {:ex_doc, "~> 0.14", only: :dev}, - {:jason, ">= 1.0.0", only: [:dev, :test]}, - {:poison, ">= 1.0.0", only: [:dev, :test]}, + {:ex_doc, "~> 0.23", only: :dev}, + {:jason, "~> 1.2", optional: true}, + {:poison, "~> 4.0", optional: true} ] end @@ -50,7 +53,6 @@ defmodule GelfLogger.Mixfile do defp package do [ files: ["lib", "mix.exs", "README*", "LICENSE" ], - maintainers: ["Joshua Schniper"], licenses: ["MIT"], links: %{"Github": "https://github.com/jschniper/gelf_logger"} ] diff --git a/mix.lock b/mix.lock index 6b5fc59..3fbb851 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,10 @@ %{ "earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], [], "hexpm", "0fdcd651f9689e81cda24c8e5d06947c5aca69dbd8ce3d836b02bcd0c6004592"}, - "ex_doc": {:hex, :ex_doc, "0.14.3", "e61cec6cf9731d7d23d254266ab06ac1decbb7651c3d1568402ec535d387b6f7", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm", "6bf36498c4c67fdbe6d4ad73a112098cbcc09b147b859219b023fc2636729bf6"}, - "jason": {:hex, :jason, "1.0.0", "0f7cfa9bdb23fed721ec05419bcee2b2c21a77e926bce0deda029b5adc716fe2", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "b96c400e04b7b765c0854c05a4966323e90c0d11fee0483b1567cda079abb205"}, - "poison": {:hex, :poison, "2.0.1", "81248a36d1b602b17ea6556bfa8952492091f01af05173de11f8b297e2bbf088", [:mix], [], "hexpm", "7f34906a0839f3b49b9b7647461c5144787611f599e8d743214280761699df2b"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.10", "6603d7a603b9c18d3d20db69921527f82ef09990885ed7525003c7fe7dc86c56", [:mix], [], "hexpm", "8e2d5370b732385db2c9b22215c3f59c84ac7dda7ed7e544d7c459496ae519c0"}, + "ex_doc": {:hex, :ex_doc, "0.23.0", "a069bc9b0bf8efe323ecde8c0d62afc13d308b1fa3d228b65bca5cf8703a529d", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f5e2c4702468b2fd11b10d39416ddadd2fcdd173ba2a0285ebd92c39827a5a16"}, + "jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"}, + "makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.15.0", "98312c9f0d3730fde4049985a1105da5155bfe5c11e47bdc7406d88e01e4219b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "75ffa34ab1056b7e24844c90bfc62aaf6f3a37a15faa76b07bc5eba27e4a8b4a"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, + "poison": {:hex, :poison, "4.0.1", "bcb755a16fac91cad79bfe9fc3585bb07b9331e50cfe3420a24bcc2d735709ae", [:mix], [], "hexpm", "ba8836feea4b394bb718a161fc59a288fe0109b5006d6bdf97b6badfcf6f0f25"}, } From 89cce22ffc97047d657794b29450717db484917c Mon Sep 17 00:00:00 2001 From: Manuel Rubio Date: Fri, 6 Nov 2020 03:07:25 +0100 Subject: [PATCH 2/7] refactor, adding async mode in difference from previous one and fix tests --- config/config.exs | 3 +- lib/gelf_logger/worker.ex | 3 +- .../backends/gelf.ex} | 5 +- lib/logger/backends/gelf_async.ex | 107 +++++++ test/gelf_async_test.exs | 293 ++++++++++++++++++ test/{gelf_logger_test.exs => gelf_test.exs} | 94 +++--- 6 files changed, 454 insertions(+), 51 deletions(-) rename lib/{gelf_logger.ex => logger/backends/gelf.ex} (95%) create mode 100644 lib/logger/backends/gelf_async.ex create mode 100644 test/gelf_async_test.exs rename test/{gelf_logger_test.exs => gelf_test.exs} (63%) diff --git a/config/config.exs b/config/config.exs index 8c7ab73..4aa5291 100644 --- a/config/config.exs +++ b/config/config.exs @@ -9,7 +9,8 @@ use Mix.Config # 3rd-party users, it should be done in your "mix.exs" file. config :logger, - backends: [{Logger.Backends.Gelf, :gelf_logger}] + backends: [], + truncate: :infinity config :logger, :gelf_logger, json_encoder: Poison, diff --git a/lib/gelf_logger/worker.ex b/lib/gelf_logger/worker.ex index 928e7a8..2b0a3e4 100644 --- a/lib/gelf_logger/worker.ex +++ b/lib/gelf_logger/worker.ex @@ -76,7 +76,8 @@ defmodule GelfLogger.Worker do :ok size > @max_size -> - raise ArgumentError, message: "Message too large" + # Ignore huge messages + :ok size > @max_packet_size -> num = div(size, @max_packet_size) diff --git a/lib/gelf_logger.ex b/lib/logger/backends/gelf.ex similarity index 95% rename from lib/gelf_logger.ex rename to lib/logger/backends/gelf.ex index 9add59f..0f8635b 100644 --- a/lib/gelf_logger.ex +++ b/lib/logger/backends/gelf.ex @@ -1,7 +1,6 @@ defmodule Logger.Backends.Gelf do @moduledoc """ GELF Logger Backend - # GelfLogger [![Build Status](https://travis-ci.org/jschniper/gelf_logger.svg?branch=master)](https://travis-ci.org/jschniper/gelf_logger) A logger backend that will generate Graylog Extended Log Format messages. The current version only supports UDP messages. @@ -79,7 +78,7 @@ defmodule Logger.Backends.Gelf do @behaviour :gen_event - def init({__MODULE__, name}) do + def init({_module, name}) do if user = Process.whereis(:user) do Process.group_leader(self(), user) {:ok, configure(name, [])} @@ -98,7 +97,7 @@ defmodule Logger.Backends.Gelf do def handle_event({level, _gl, {Logger, msg, ts, md}}, %{level: min_level} = state) do if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do - GelfLogger.Worker.start_child(level, msg, ts, md, state) + GelfLogger.Worker.run([level, msg, ts, md, state]) end {:ok, state} diff --git a/lib/logger/backends/gelf_async.ex b/lib/logger/backends/gelf_async.ex new file mode 100644 index 0000000..67cfb76 --- /dev/null +++ b/lib/logger/backends/gelf_async.ex @@ -0,0 +1,107 @@ +defmodule Logger.Backends.GelfAsync do + @moduledoc """ + GELF Logger Backend Async + + A logger backend that will generate Graylog Extended Log Format messages. The + current version only supports UDP messages. This module specify an async way + to send the messages avoiding a bottleneck. + + ## Configuration + + In the config.exs, add gelf_logger as a backend like this: + + ``` + config :logger, + backends: [:console, {Logger.Backends.GelfAsync, :gelf_logger}] + ``` + + In addition, you'll need to pass in some configuration items to the backend + itself: + + ``` + config :logger, :gelf_logger, + host: "127.0.0.1", + port: 12201, + format: "$message", + application: "myapp", + compression: :gzip, # Defaults to :gzip, also accepts :zlib or :raw + metadata: [:request_id, :function, :module, :file, :line], + hostname: "hostname-override", + json_encoder: Poison, + tags: [ + list: "of", + extra: "tags" + ] + ``` + + In addition, if you want to use your custom metadata formatter as a "callback", + you'll need to add below configuration entry: + + ``` + format: {Module, :function} + ``` + Please bear in mind that your formating function MUST return a tuple in following + format: `{level, message, timestamp, metadata}` + + + In addition to the backend configuration, you might want to check the + [Logger configuration](https://hexdocs.pm/logger/Logger.html) for other + options that might be important for your particular environment. In + particular, modifying the `:utc_log` setting might be necessary + depending on your server configuration. + This backend supports `metadata: :all`. + + ### Note on the JSON encoder: + + Currently, the logger defaults to Poison but it can be switched out for any + module that has an encode!/1 function. + + ## Usage + + Just use Logger as normal. + + ## Improvements + + - [x] Tests + - [ ] TCP Support + - [x] Options for compression (none, zlib) + - [x] Send timestamp instead of relying on the Graylog server to set it + - [x] Find a better way of pulling the hostname + + And probably many more. This is only out here because it might be useful to + someone in its current state. Pull requests are always welcome. + + ## Notes + + Credit where credit is due, this would not exist without + [protofy/erl_graylog_sender](https://github.com/protofy/erl_graylog_sender). + """ + + @behaviour :gen_event + + defdelegate init(args), to: Logger.Backends.Gelf + + defdelegate handle_call(message, state), to: Logger.Backends.Gelf + + def handle_event({_level, gl, _event}, state) when node(gl) != node() do + {:ok, state} + end + + def handle_event({level, _gl, {Logger, msg, ts, md}}, %{level: min_level} = state) do + if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do + GelfLogger.Worker.run([level, msg, ts, md, state]) + end + + {:ok, state} + end + + def handle_event(:flush, state) do + {:ok, state} + end + + defdelegate handle_info(message, state), to: Logger.Backends.Gelf + + defdelegate code_change(old_vsn, state, extra), to: Logger.Backends.Gelf + + defdelegate terminate(reason, state), to: Logger.Backends.Gelf +end diff --git a/test/gelf_async_test.exs b/test/gelf_async_test.exs new file mode 100644 index 0000000..4b18f36 --- /dev/null +++ b/test/gelf_async_test.exs @@ -0,0 +1,293 @@ +defmodule Logger.Backends.GelfAsyncTest do + require Logger + + use ExUnit.Case, async: false + doctest Logger.Backends.GelfAsync + + @default_env Application.get_env(:logger, :gelf_logger) + Logger.add_backend({Logger.Backends.GelfAsync, :gelf_logger}) + + setup do + {:ok, socket} = :gen_udp.open(0, [:binary, active: true]) + {:ok, port} = :inet.port(socket) + + {:ok, [socket: socket, port: port]} + end + + test "sends a message via udp", context do + reconfigure_backend(port: context[:port]) + + Logger.warn("test") + + assert_receive {:udp, _socket, address, _port, packet}, 2000 + + # Should be coming from localhost + assert address == {127, 0, 0, 1} + + map = process_packet(packet) + + assert map["version"] == "1.1" + assert map["_application"] == "myapp" + assert map["short_message"] == "test" + assert map["full_message"] == "test" + end + + test "convert port from binary to integer", context do + reconfigure_backend(port: to_string(context[:port])) + + Logger.info("test") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + assert map["version"] == "1.1" + assert map["_application"] == "myapp" + assert map["short_message"] == "test" + assert map["full_message"] == "test" + end + + test "convert domain from list to binary", context do + if Version.compare(System.version(), "1.10.0") in [:gt, :eq] do + reconfigure_backend(metadata: :all, port: context[:port]) + + Logger.info("test") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + assert %{"_domain" => "[:elixir]"} = process_packet(packet) + end + end + + test "configurable source (host)", context do + reconfigure_backend(hostname: 'host-dev-1', port: context[:port]) + + Logger.info("test") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + assert map["host"] == "host-dev-1" + end + + test "configurable tags", context do + reconfigure_backend(tags: [foo: "bar", baz: "qux"], port: context[:port]) + + Logger.info("test") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + assert map["_foo"] == "bar" + assert map["_baz"] == "qux" + end + + test "configurable metadata", context do + reconfigure_backend(metadata: [:this], port: context[:port]) + + Logger.metadata(this: "that", something: "else") + Logger.info("test") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + assert map["_application"] == "myapp" + assert map["_this"] == "that" + assert map["_something"] == nil + end + + test "all metadata possible", context do + reconfigure_backend(metadata: :all, port: context[:port]) + + Logger.metadata(this: "that", something: "else") + Logger.info("test") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + assert map["_application"] == "myapp" + assert map["_this"] == "that" + assert map["_something"] == "else" + end + + test "format message", context do + reconfigure_backend(format: "[$level] $message", port: context[:port]) + + Logger.info("test") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + assert map["short_message"] == "[info] test" + assert map["full_message"] == "[info] test" + end + + test "skip empty messages", context do + reconfigure_backend(format: "", port: context[:port]) + + Logger.info("test") + + refute_receive {:udp, _socket, _address, _port, _packet}, 2000 + end + + test "short message should cap at 80 characters", context do + reconfigure_backend(port: context[:port]) + + Logger.info( + "This is a test string that is over eighty characters but only because I kept typing garbage long after I had run out of things to say" + ) + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + assert map["short_message"] != map["full_message"] + assert String.length(map["short_message"]) <= 80 + end + + test "log levels are being set correctly", context do + reconfigure_backend(port: context[:port]) + + # DEBUG + Logger.debug("debug") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + assert map["level"] == 7 + + # INFO + Logger.info("info") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + assert map["level"] == 6 + + # WARN + Logger.warn("warn") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + assert map["level"] == 4 + + # ERROR + Logger.error("error") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + assert map["level"] == 3 + end + + test "should ignore the log if max message size is exceeded", context do + reconfigure_backend(port: context[:port]) + Logger.info(:crypto.strong_rand_bytes(2_000_000) |> :base64.encode()) + refute_receive {:udp, _socket, _address, _port, _packet}, 2000 + end + + test "using compression gzip", context do + reconfigure_backend(compression: :gzip, port: context[:port]) + + Logger.info("test gzip") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + {:error, _} = Poison.decode(packet) + + map = process_packet(packet) + + assert(map["full_message"] == "test gzip") + end + + test "using compression zlib", context do + reconfigure_backend(compression: :zlib, port: context[:port]) + + Logger.info("test zlib") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + {:error, _} = Poison.decode(packet) + + map = process_packet(packet) + + assert(map["full_message"] == "test zlib") + end + + test "switching JSON encoder", context do + reconfigure_backend(json_encoder: Jason, port: context[:port]) + + Logger.info("test different encoder") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + assert(map["full_message"] == "test different encoder") + end + + test "can use custom formatter", context do + reconfigure_backend( + format: {Test.Support.LogFormatter, :format}, + metadata: :all, + port: context[:port] + ) + + Logger.info("test formatter callback") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + assert(Map.has_key?(map, "_timestamp_us")) + end + + test "cannot use nonexistent custom formatter", context do + reconfigure_backend( + format: {Test.Support.LogFormatter, :bad_format}, + metadata: :all, + port: context[:port] + ) + + Logger.info("test bad formatter callback") + + assert_receive {:udp, _socket, _address, _port, packet}, 2000 + + map = process_packet(packet) + + refute(Map.has_key?(map, "_timestamp_us")) + assert(map["full_message"] == "test bad formatter callback") + end + + defp process_packet(packet) do + compression = Application.get_env(:logger, :gelf_logger)[:compression] + + data = + case compression do + :gzip -> :zlib.gunzip(packet) + :zlib -> :zlib.uncompress(packet) + _ -> packet + end + + {:ok, map} = Poison.decode(data |> to_string) + + map + end + + defp reconfigure_backend(new_env) do + Logger.remove_backend({Logger.Backends.GelfAsync, :gelf_logger}) + Application.put_env(:logger, :gelf_logger, Keyword.merge(@default_env, new_env)) + Logger.add_backend({Logger.Backends.GelfAsync, :gelf_logger}) + :ok + end +end diff --git a/test/gelf_logger_test.exs b/test/gelf_test.exs similarity index 63% rename from test/gelf_logger_test.exs rename to test/gelf_test.exs index 52ac627..fe93523 100644 --- a/test/gelf_logger_test.exs +++ b/test/gelf_test.exs @@ -1,24 +1,25 @@ -defmodule GelfLoggerTest do +defmodule Logger.Backends.GelfTest do require Logger - use ExUnit.Case, async: true + use ExUnit.Case, async: false doctest Logger.Backends.Gelf @default_env Application.get_env(:logger, :gelf_logger) Logger.add_backend({Logger.Backends.Gelf, :gelf_logger}) setup do - {:ok, socket} = :gen_udp.open(12201, [:binary, {:active, false}]) + {:ok, socket} = :gen_udp.open(0, [:binary, active: true]) + {:ok, port} = :inet.port(socket) - {:ok, [socket: socket]} + {:ok, [socket: socket, port: port]} end test "sends a message via udp", context do - reconfigure_backend() + reconfigure_backend(port: context[:port]) - Logger.info("test") + Logger.warn("test") - {:ok, {address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, address, _port, packet}, 2000 # Should be coming from localhost assert address == {127, 0, 0, 1} @@ -32,11 +33,11 @@ defmodule GelfLoggerTest do end test "convert port from binary to integer", context do - reconfigure_backend(port: "12201") + reconfigure_backend(port: to_string(context[:port])) Logger.info("test") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) @@ -48,22 +49,22 @@ defmodule GelfLoggerTest do test "convert domain from list to binary", context do if Version.compare(System.version(), "1.10.0") in [:gt, :eq] do - reconfigure_backend(metadata: :all) + reconfigure_backend(metadata: :all, port: context[:port]) Logger.info("test") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 assert %{"_domain" => "[:elixir]"} = process_packet(packet) end end test "configurable source (host)", context do - reconfigure_backend(hostname: 'host-dev-1') + reconfigure_backend(hostname: 'host-dev-1', port: context[:port]) Logger.info("test") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) @@ -71,11 +72,11 @@ defmodule GelfLoggerTest do end test "configurable tags", context do - reconfigure_backend(tags: [foo: "bar", baz: "qux"]) + reconfigure_backend(tags: [foo: "bar", baz: "qux"], port: context[:port]) Logger.info("test") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) @@ -84,12 +85,12 @@ defmodule GelfLoggerTest do end test "configurable metadata", context do - reconfigure_backend(metadata: [:this]) + reconfigure_backend(metadata: [:this], port: context[:port]) Logger.metadata(this: "that", something: "else") Logger.info("test") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) @@ -99,12 +100,12 @@ defmodule GelfLoggerTest do end test "all metadata possible", context do - reconfigure_backend(metadata: :all) + reconfigure_backend(metadata: :all, port: context[:port]) Logger.metadata(this: "that", something: "else") Logger.info("test") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) @@ -114,11 +115,11 @@ defmodule GelfLoggerTest do end test "format message", context do - reconfigure_backend(format: "[$level] $message") + reconfigure_backend(format: "[$level] $message", port: context[:port]) Logger.info("test") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) @@ -127,21 +128,21 @@ defmodule GelfLoggerTest do end test "skip empty messages", context do - reconfigure_backend(format: "") + reconfigure_backend(format: "", port: context[:port]) Logger.info("test") - assert {:error, :timeout} == :gen_udp.recv(context[:socket], 0, 1000) + refute_receive {:udp, _socket, _address, _port, _packet}, 2000 end test "short message should cap at 80 characters", context do - reconfigure_backend() + reconfigure_backend(port: context[:port]) Logger.info( "This is a test string that is over eighty characters but only because I kept typing garbage long after I had run out of things to say" ) - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) @@ -150,12 +151,12 @@ defmodule GelfLoggerTest do end test "log levels are being set correctly", context do - reconfigure_backend() + reconfigure_backend(port: context[:port]) # DEBUG Logger.debug("debug") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) @@ -164,7 +165,7 @@ defmodule GelfLoggerTest do # INFO Logger.info("info") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) @@ -173,7 +174,7 @@ defmodule GelfLoggerTest do # WARN Logger.warn("warn") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) @@ -182,26 +183,25 @@ defmodule GelfLoggerTest do # ERROR Logger.error("error") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) assert map["level"] == 3 end - # The Logger module truncates all messages over 8192 bytes so this can't be tested - test "should raise error if max message size is exceeded" do - # assert_raise(ArgumentError, "Message too large", fn -> - # Logger.info :crypto.rand_bytes(1000000) |> :base64.encode - # end) + test "should ignore the log if max message size is exceeded", context do + reconfigure_backend(port: context[:port]) + Logger.info(:crypto.strong_rand_bytes(2_000_000) |> :base64.encode()) + refute_receive {:udp, _socket, _address, _port, _packet}, 2000 end test "using compression gzip", context do - reconfigure_backend(compression: :gzip) + reconfigure_backend(compression: :gzip, port: context[:port]) Logger.info("test gzip") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 {:error, _} = Poison.decode(packet) @@ -211,11 +211,11 @@ defmodule GelfLoggerTest do end test "using compression zlib", context do - reconfigure_backend(compression: :zlib) + reconfigure_backend(compression: :zlib, port: context[:port]) Logger.info("test zlib") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 {:error, _} = Poison.decode(packet) @@ -225,11 +225,11 @@ defmodule GelfLoggerTest do end test "switching JSON encoder", context do - reconfigure_backend(json_encoder: Jason) + reconfigure_backend(json_encoder: Jason, port: context[:port]) Logger.info("test different encoder") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) @@ -239,12 +239,13 @@ defmodule GelfLoggerTest do test "can use custom formatter", context do reconfigure_backend( format: {Test.Support.LogFormatter, :format}, - metadata: :all + metadata: :all, + port: context[:port] ) Logger.info("test formatter callback") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) @@ -254,12 +255,13 @@ defmodule GelfLoggerTest do test "cannot use nonexistent custom formatter", context do reconfigure_backend( format: {Test.Support.LogFormatter, :bad_format}, - metadata: :all + metadata: :all, + port: context[:port] ) Logger.info("test bad formatter callback") - {:ok, {_address, _port, packet}} = :gen_udp.recv(context[:socket], 0, 2000) + assert_receive {:udp, _socket, _address, _port, packet}, 2000 map = process_packet(packet) @@ -282,7 +284,7 @@ defmodule GelfLoggerTest do map end - defp reconfigure_backend(new_env \\ []) do + defp reconfigure_backend(new_env) do Logger.remove_backend({Logger.Backends.Gelf, :gelf_logger}) Application.put_env(:logger, :gelf_logger, Keyword.merge(@default_env, new_env)) Logger.add_backend({Logger.Backends.Gelf, :gelf_logger}) From 382a4584571ab547bf1bd59b69e8fca1c7d3f6dd Mon Sep 17 00:00:00 2001 From: Manuel Rubio Date: Tue, 10 Nov 2020 13:00:53 +0100 Subject: [PATCH 3/7] refactor, use precreated workers instead of tasks to limit the consumption of resources --- lib/gelf_logger/application.ex | 9 +++++- lib/gelf_logger/balancer.ex | 51 +++++++++++++++++++++++++++++++ lib/gelf_logger/worker.ex | 18 +++++++---- lib/logger/backends/gelf.ex | 2 +- lib/logger/backends/gelf_async.ex | 2 +- 5 files changed, 73 insertions(+), 9 deletions(-) create mode 100644 lib/gelf_logger/balancer.ex diff --git a/lib/gelf_logger/application.ex b/lib/gelf_logger/application.ex index 9e27dd5..00760c6 100644 --- a/lib/gelf_logger/application.ex +++ b/lib/gelf_logger/application.ex @@ -2,10 +2,17 @@ defmodule GelfLogger.Application do @moduledoc false use Application + @default_pool_size 5 + + defp pool_size() do + Application.get_env(:logger, :gelf_logger)[:pool_size] || @default_pool_size + end + @impl Application def start(_type, _args) do children = [ - {Task.Supervisor, name: GelfLogger.Pool} + {DynamicSupervisor, strategy: :one_for_one, name: GelfLogger.Pool}, + {GelfLogger.Balancer, [pool_size: pool_size()]} ] options = [strategy: :one_for_one, name: GelfLogger.Supervisor] diff --git a/lib/gelf_logger/balancer.ex b/lib/gelf_logger/balancer.ex new file mode 100644 index 0000000..600c02e --- /dev/null +++ b/lib/gelf_logger/balancer.ex @@ -0,0 +1,51 @@ +defmodule GelfLogger.Balancer do + use GenServer + + @supervisor GelfLogger.Pool + @worker GelfLogger.Worker + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + def start_new_child() do + DynamicSupervisor.start_child(@supervisor, @worker) + end + + def cast(level, msg, ts, md, state) do + GenServer.cast(__MODULE__, [level, msg, ts, md, state]) + end + + @impl GenServer + def init(opts) do + pool_size = opts[:pool_size] || raise """ + Gelf Logger: Not provided pool_size configuration! + """ + pids = + for _ <- 1..pool_size do + {:ok, pid} = start_new_child() + Process.monitor(pid) + pid + end + + {:ok, pids} + end + + @impl GenServer + def handle_cast(msg, [pid | pids]) do + :ok = GenServer.cast(pid, msg) + {:noreply, pids ++ [pid]} + end + + @impl GenServer + def handle_info({:DOWN, _ref, :process, old_pid, _reason}, pids) do + {:ok, new_pid} = start_new_child() + {:noreply, [new_pid | pids -- [old_pid]]} + end + + @impl GenServer + def terminate(_reason, pids) do + Enum.each pids, &GenServer.stop/1 + :ok + end +end diff --git a/lib/gelf_logger/worker.ex b/lib/gelf_logger/worker.ex index 2b0a3e4..71a9cd6 100644 --- a/lib/gelf_logger/worker.ex +++ b/lib/gelf_logger/worker.ex @@ -1,16 +1,21 @@ defmodule GelfLogger.Worker do - @supervisor GelfLogger.Pool + use GenServer, restart: :temporary + @max_size 1_047_040 @max_packet_size 8192 @max_payload_size 8180 - def start_child(level, msg, ts, md, state) do - args = [level, msg, ts, md, state] - opts = [restart: :transient] - Task.Supervisor.start_child(@supervisor, __MODULE__, :run, [args], opts) + def start_link([]) do + GenServer.start_link(__MODULE__, []) + end + + @impl GenServer + def init([]) do + {:ok, []} end - def run([level, msg, ts, md, state]) do + @impl GenServer + def handle_cast([level, msg, ts, md, state], []) do {level, msg, ts, md} = format(level, msg, ts, md, state[:format]) int_level = @@ -105,6 +110,7 @@ defmodule GelfLogger.Worker do true -> :gen_udp.send(state[:socket], state[:gl_host], state[:port], data) end + {:noreply, []} end defp format(level, message, timestamp, metadata, {module, function}) do diff --git a/lib/logger/backends/gelf.ex b/lib/logger/backends/gelf.ex index 0f8635b..b774000 100644 --- a/lib/logger/backends/gelf.ex +++ b/lib/logger/backends/gelf.ex @@ -97,7 +97,7 @@ defmodule Logger.Backends.Gelf do def handle_event({level, _gl, {Logger, msg, ts, md}}, %{level: min_level} = state) do if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do - GelfLogger.Worker.run([level, msg, ts, md, state]) + GelfLogger.Worker.handle_cast([level, msg, ts, md, state], []) end {:ok, state} diff --git a/lib/logger/backends/gelf_async.ex b/lib/logger/backends/gelf_async.ex index 67cfb76..556ae3d 100644 --- a/lib/logger/backends/gelf_async.ex +++ b/lib/logger/backends/gelf_async.ex @@ -89,7 +89,7 @@ defmodule Logger.Backends.GelfAsync do def handle_event({level, _gl, {Logger, msg, ts, md}}, %{level: min_level} = state) do if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do - GelfLogger.Worker.run([level, msg, ts, md, state]) + GelfLogger.Balancer.cast(level, msg, ts, md, state) end {:ok, state} From 4028d41e130acfc173ad164a09064b75935be30f Mon Sep 17 00:00:00 2001 From: Manuel Rubio Date: Tue, 10 Nov 2020 14:25:04 +0100 Subject: [PATCH 4/7] refactor configuration and state, it creates an UDP socket per worker --- README.md | 2 +- lib/gelf_logger/balancer.ex | 29 ++++++++--- lib/gelf_logger/config.ex | 81 +++++++++++++++++++++++++++++++ lib/gelf_logger/worker.ex | 42 ++++++++++------ lib/logger/backends/gelf.ex | 77 +++-------------------------- lib/logger/backends/gelf_async.ex | 12 +++-- test/gelf_async_test.exs | 2 +- test/gelf_test.exs | 7 ++- 8 files changed, 152 insertions(+), 100 deletions(-) create mode 100644 lib/gelf_logger/config.ex diff --git a/README.md b/README.md index e0a1099..fb4b38d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# GelfLogger [![Build Status](https://travis-ci.org/jschniper/gelf_logger.svg?branch=master)](https://travis-ci.org/jschniper/gelf_logger) +# GelfLogger [![Build Status](https://travis-ci.org/manuel-rubio/gelf_logger.svg?branch=master)](https://travis-ci.org/manuel-rubio/gelf_logger) A logger backend that will generate Graylog Extended Log Format messages. The current version only supports UDP messages. diff --git a/lib/gelf_logger/balancer.ex b/lib/gelf_logger/balancer.ex index 600c02e..7deea7a 100644 --- a/lib/gelf_logger/balancer.ex +++ b/lib/gelf_logger/balancer.ex @@ -12,15 +12,22 @@ defmodule GelfLogger.Balancer do DynamicSupervisor.start_child(@supervisor, @worker) end - def cast(level, msg, ts, md, state) do - GenServer.cast(__MODULE__, [level, msg, ts, md, state]) + def configure(name, options) do + GenServer.call(__MODULE__, {:configure, name, options}) + end + + def cast(level, msg, ts, md) do + GenServer.cast(__MODULE__, {:cast, level, msg, ts, md}) end @impl GenServer def init(opts) do - pool_size = opts[:pool_size] || raise """ - Gelf Logger: Not provided pool_size configuration! - """ + pool_size = + opts[:pool_size] || + raise """ + Gelf Logger: Not provided pool_size configuration! + """ + pids = for _ <- 1..pool_size do {:ok, pid} = start_new_child() @@ -32,11 +39,17 @@ defmodule GelfLogger.Balancer do end @impl GenServer - def handle_cast(msg, [pid | pids]) do - :ok = GenServer.cast(pid, msg) + def handle_cast({:cast, level, msg, ts, md}, [pid | pids]) do + :ok = GenServer.cast(pid, [level, msg, ts, md]) {:noreply, pids ++ [pid]} end + @impl GenServer + def handle_call({:configure, name, options}, from, pids) do + for pid <- pids, do: GenServer.cast(pid, {:configure, from, name, options}) + {:noreply, pids} + end + @impl GenServer def handle_info({:DOWN, _ref, :process, old_pid, _reason}, pids) do {:ok, new_pid} = start_new_child() @@ -45,7 +58,7 @@ defmodule GelfLogger.Balancer do @impl GenServer def terminate(_reason, pids) do - Enum.each pids, &GenServer.stop/1 + Enum.each(pids, &GenServer.stop/1) :ok end end diff --git a/lib/gelf_logger/config.ex b/lib/gelf_logger/config.ex new file mode 100644 index 0000000..0bd0950 --- /dev/null +++ b/lib/gelf_logger/config.ex @@ -0,0 +1,81 @@ +defmodule GelfLogger.Config do + @moduledoc """ + Configuration state internal to be shared for the normal handle + event in synchronous mode and every worker in asynchronous mode. + """ + + defstruct [ + :name, + :gl_host, + :host, + :port, + :metadata, + :level, + :application, + :socket, + :compression, + :tags, + :encoder, + :format + ] + + def configure(name, options) do + config = Keyword.merge(Application.get_env(:logger, name, []), options) + Application.put_env(:logger, name, config) + + {:ok, socket} = :gen_udp.open(0) + + {:ok, hostname} = :inet.gethostname() + + hostname = Keyword.get(config, :hostname, to_string(hostname)) + + gl_host = to_charlist(Keyword.get(config, :host)) + port = Keyword.get(config, :port) + application = Keyword.get(config, :application) + level = Keyword.get(config, :level) + metadata = Keyword.get(config, :metadata, []) + compression = Keyword.get(config, :compression, :gzip) + encoder = Keyword.get(config, :json_encoder, Poison) + tags = Keyword.get(config, :tags, []) + format = process_format(Keyword.get(config, :format, "$message")) + port = process_port(port) + + %__MODULE__{ + name: name, + gl_host: gl_host, + host: hostname, + port: port, + metadata: metadata, + level: level, + application: application, + socket: socket, + compression: compression, + tags: tags, + encoder: encoder, + format: format + } + end + + defp process_format({module, function}) when is_atom(module) and is_atom(function) do + with true <- Code.ensure_loaded?(module), + true <- function_exported?(module, function, 4) do + {module, function} + else + _ -> Logger.Formatter.compile("$message") + end + end + + defp process_format(format) do + Logger.Formatter.compile(format) + rescue + _ in ArgumentError -> + Logger.Formatter.compile("$message") + end + + defp process_port(port) when is_binary(port) do + {val, ""} = Integer.parse(port) + val + end + + defp process_port(port), do: port +end diff --git a/lib/gelf_logger/worker.ex b/lib/gelf_logger/worker.ex index 71a9cd6..2ad8020 100644 --- a/lib/gelf_logger/worker.ex +++ b/lib/gelf_logger/worker.ex @@ -4,6 +4,7 @@ defmodule GelfLogger.Worker do @max_size 1_047_040 @max_packet_size 8192 @max_payload_size 8180 + @default_name :gelf_logger def start_link([]) do GenServer.start_link(__MODULE__, []) @@ -11,12 +12,12 @@ defmodule GelfLogger.Worker do @impl GenServer def init([]) do - {:ok, []} + {:ok, GelfLogger.Config.configure(@default_name, [])} end @impl GenServer - def handle_cast([level, msg, ts, md, state], []) do - {level, msg, ts, md} = format(level, msg, ts, md, state[:format]) + def handle_cast([level, msg, ts, md], state) do + {level, msg, ts, md} = format(level, msg, ts, md, state.format) int_level = case level do @@ -32,8 +33,8 @@ defmodule GelfLogger.Worker do fields = md - |> take_metadata(state[:metadata]) - |> Keyword.merge(state[:tags]) + |> take_metadata(state.metadata) + |> Keyword.merge(state.tags) |> Map.new(fn {k, v} -> if is_list(v) or String.Chars.impl_for(v) == nil do {"_#{k}", inspect(v)} @@ -53,7 +54,7 @@ defmodule GelfLogger.Worker do timestamp = Float.round(epoch_milliseconds / 1_000, 3) msg_formatted = - if(is_tuple(state[:format]), do: msg, else: format_event(level, msg, ts, md, state)) + if(is_tuple(state.format), do: msg, else: format_event(level, msg, ts, md, state)) |> to_string() gelf = @@ -61,17 +62,17 @@ defmodule GelfLogger.Worker do short_message: String.slice(msg_formatted, 0..79), full_message: msg_formatted, version: "1.1", - host: state[:host], + host: state.host, level: int_level, timestamp: timestamp, - _application: state[:application] + _application: state.application } |> Map.merge(fields) data = gelf - |> encode(state[:encoder]) - |> compress(state[:compression]) + |> encode(state.encoder) + |> compress(state.compression) size = byte_size(data) @@ -97,9 +98,9 @@ defmodule GelfLogger.Worker do id = :crypto.strong_rand_bytes(8) send_chunks( - state[:socket], - state[:gl_host], - state[:port], + state.socket, + state.gl_host, + state.port, data, id, :binary.encode_unsigned(num), @@ -108,9 +109,20 @@ defmodule GelfLogger.Worker do ) true -> - :gen_udp.send(state[:socket], state[:gl_host], state[:port], data) + :gen_udp.send(state.socket, state.gl_host, state.port, data) end - {:noreply, []} + + {:noreply, state} + end + + def handle_cast({:configure, from, name, options}, state) do + if state.socket do + :gen_udp.close(state.socket) + end + + state = GelfLogger.Config.configure(name, options) + GenServer.reply(from, state) + {:noreply, state} end defp format(level, message, timestamp, metadata, {module, function}) do diff --git a/lib/logger/backends/gelf.ex b/lib/logger/backends/gelf.ex index b774000..b512d46 100644 --- a/lib/logger/backends/gelf.ex +++ b/lib/logger/backends/gelf.ex @@ -81,14 +81,18 @@ defmodule Logger.Backends.Gelf do def init({_module, name}) do if user = Process.whereis(:user) do Process.group_leader(self(), user) - {:ok, configure(name, [])} + {:ok, GelfLogger.Config.configure(name, [])} else {:error, :ignore} end end def handle_call({:configure, options}, state) do - {:ok, :ok, configure(state[:name], options)} + if state.socket do + :gen_udp.close(state.socket) + end + + {:ok, :ok, GelfLogger.Config.configure(state[:name], options)} end def handle_event({_level, gl, _event}, state) when node(gl) != node() do @@ -97,7 +101,7 @@ defmodule Logger.Backends.Gelf do def handle_event({level, _gl, {Logger, msg, ts, md}}, %{level: min_level} = state) do if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do - GelfLogger.Worker.handle_cast([level, msg, ts, md, state], []) + GelfLogger.Worker.handle_cast([level, msg, ts, md], state) end {:ok, state} @@ -131,71 +135,4 @@ defmodule Logger.Backends.Gelf do def terminate(_reason, _state) do :ok end - - ## Helpers - - defp configure(name, options) do - config = Keyword.merge(Application.get_env(:logger, name, []), options) - Application.put_env(:logger, name, config) - - {:ok, socket} = :gen_udp.open(0) - - {:ok, hostname} = :inet.gethostname() - - hostname = Keyword.get(config, :hostname, hostname) - - gl_host = Keyword.get(config, :host) |> to_charlist - port = Keyword.get(config, :port) - application = Keyword.get(config, :application) - level = Keyword.get(config, :level) - metadata = Keyword.get(config, :metadata, []) - compression = Keyword.get(config, :compression, :gzip) - encoder = Keyword.get(config, :json_encoder, Poison) - tags = Keyword.get(config, :tags, []) - - format = - try do - format = Keyword.get(config, :format, "$message") - - case format do - {module, function} -> - with true <- Code.ensure_loaded?(module), - true <- function_exported?(module, function, 4) do - {module, function} - else - _ -> - Logger.Formatter.compile("$message") - end - - _ -> - Logger.Formatter.compile(format) - end - rescue - _ -> - Logger.Formatter.compile("$message") - end - - port = - if is_binary(port) do - {val, ""} = Integer.parse(to_string(port)) - val - else - port - end - - %{ - name: name, - gl_host: gl_host, - host: to_string(hostname), - port: port, - metadata: metadata, - level: level, - application: application, - socket: socket, - compression: compression, - tags: tags, - encoder: encoder, - format: format - } - end end diff --git a/lib/logger/backends/gelf_async.ex b/lib/logger/backends/gelf_async.ex index 556ae3d..fa8648b 100644 --- a/lib/logger/backends/gelf_async.ex +++ b/lib/logger/backends/gelf_async.ex @@ -79,9 +79,15 @@ defmodule Logger.Backends.GelfAsync do @behaviour :gen_event - defdelegate init(args), to: Logger.Backends.Gelf + def init({_module, name}) do + state = GelfLogger.Balancer.configure(name, []) + {:ok, %{name: name, level: state.level}} + end - defdelegate handle_call(message, state), to: Logger.Backends.Gelf + def handle_call({:configure, options}, state) do + state = GelfLogger.Balancer.configure(state.name, options) + {:ok, :ok, %{state | level: state.level}} + end def handle_event({_level, gl, _event}, state) when node(gl) != node() do {:ok, state} @@ -89,7 +95,7 @@ defmodule Logger.Backends.GelfAsync do def handle_event({level, _gl, {Logger, msg, ts, md}}, %{level: min_level} = state) do if is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt do - GelfLogger.Balancer.cast(level, msg, ts, md, state) + GelfLogger.Balancer.cast(level, msg, ts, md) end {:ok, state} diff --git a/test/gelf_async_test.exs b/test/gelf_async_test.exs index 4b18f36..7132d16 100644 --- a/test/gelf_async_test.exs +++ b/test/gelf_async_test.exs @@ -60,7 +60,7 @@ defmodule Logger.Backends.GelfAsyncTest do end test "configurable source (host)", context do - reconfigure_backend(hostname: 'host-dev-1', port: context[:port]) + reconfigure_backend(hostname: "host-dev-1", port: context[:port]) Logger.info("test") diff --git a/test/gelf_test.exs b/test/gelf_test.exs index fe93523..d45ad28 100644 --- a/test/gelf_test.exs +++ b/test/gelf_test.exs @@ -60,7 +60,7 @@ defmodule Logger.Backends.GelfTest do end test "configurable source (host)", context do - reconfigure_backend(hostname: 'host-dev-1', port: context[:port]) + reconfigure_backend(hostname: "host-dev-1", port: context[:port]) Logger.info("test") @@ -279,7 +279,10 @@ defmodule Logger.Backends.GelfTest do _ -> packet end - {:ok, map} = Poison.decode(data |> to_string) + {:ok, map} = + data + |> to_string() + |> Poison.decode() map end From 34e56a4cb1eceb0266a2c1e2d1067c2710817d6c Mon Sep 17 00:00:00 2001 From: Manuel Rubio Date: Tue, 10 Nov 2020 14:27:13 +0100 Subject: [PATCH 5/7] update doc --- README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index fb4b38d..1c9e6c9 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# GelfLogger [![Build Status](https://travis-ci.org/manuel-rubio/gelf_logger.svg?branch=master)](https://travis-ci.org/manuel-rubio/gelf_logger) +# GelfLogger [![Build Status](https://travis-ci.org/jschniper/gelf_logger.svg?branch=master)](https://travis-ci.org/jschniper/gelf_logger) A logger backend that will generate Graylog Extended Log Format messages. The current version only supports UDP messages. @@ -12,6 +12,13 @@ config :logger, backends: [:console, {Logger.Backends.Gelf, :gelf_logger}] ``` +If you want to use the asynchronous handler then you have to use: + +```elixir +config :logger, + backends: [:console, {Logger.Backends.GelfAsync, :gelf_logger}] +``` + In addition, you'll need to pass in some configuration items to the backend itself: From 0d69d93b4958f4e4866a730d0c6883550f2b1bb0 Mon Sep 17 00:00:00 2001 From: Manuel Rubio Date: Thu, 10 Dec 2020 13:28:51 +0100 Subject: [PATCH 6/7] fix issue starting and configuring from Logger --- config/config.exs | 2 +- lib/gelf_logger/balancer.ex | 11 +++++++---- lib/gelf_logger/config.ex | 7 +++++++ lib/gelf_logger/worker.ex | 8 +++++--- lib/logger/backends/gelf_async.ex | 10 ++++++---- 5 files changed, 26 insertions(+), 12 deletions(-) diff --git a/config/config.exs b/config/config.exs index 4aa5291..32d2813 100644 --- a/config/config.exs +++ b/config/config.exs @@ -13,7 +13,7 @@ config :logger, truncate: :infinity config :logger, :gelf_logger, - json_encoder: Poison, + json_encoder: Jason, host: "127.0.0.1", port: 12201, application: "myapp", diff --git a/lib/gelf_logger/balancer.ex b/lib/gelf_logger/balancer.ex index 7deea7a..3e38aec 100644 --- a/lib/gelf_logger/balancer.ex +++ b/lib/gelf_logger/balancer.ex @@ -13,7 +13,11 @@ defmodule GelfLogger.Balancer do end def configure(name, options) do - GenServer.call(__MODULE__, {:configure, name, options}) + if Process.whereis(__MODULE__) do + GenServer.cast(__MODULE__, {:configure, name, options}) + else + :persistent_term.put(:gelf_logger, {:configure, name, options}) + end end def cast(level, msg, ts, md) do @@ -44,9 +48,8 @@ defmodule GelfLogger.Balancer do {:noreply, pids ++ [pid]} end - @impl GenServer - def handle_call({:configure, name, options}, from, pids) do - for pid <- pids, do: GenServer.cast(pid, {:configure, from, name, options}) + def handle_cast({:configure, name, options}, pids) do + for pid <- pids, do: GenServer.cast(pid, {:configure, name, options}) {:noreply, pids} end diff --git a/lib/gelf_logger/config.ex b/lib/gelf_logger/config.ex index 0bd0950..75ed60b 100644 --- a/lib/gelf_logger/config.ex +++ b/lib/gelf_logger/config.ex @@ -19,6 +19,13 @@ defmodule GelfLogger.Config do :format ] + def get_loglevel(name, options) do + :logger + |> Application.get_env(name, []) + |> Keyword.merge(options) + |> Keyword.get(:level) + end + def configure(name, options) do config = Keyword.merge(Application.get_env(:logger, name, []), options) Application.put_env(:logger, name, config) diff --git a/lib/gelf_logger/worker.ex b/lib/gelf_logger/worker.ex index 2ad8020..31c9382 100644 --- a/lib/gelf_logger/worker.ex +++ b/lib/gelf_logger/worker.ex @@ -12,7 +12,9 @@ defmodule GelfLogger.Worker do @impl GenServer def init([]) do - {:ok, GelfLogger.Config.configure(@default_name, [])} + {:configure, name, options} = :persistent_term.get(:gelf_logger, {:configure, @default_name, []}) + state = GelfLogger.Config.configure(name, options) + {:ok, state} end @impl GenServer @@ -115,13 +117,13 @@ defmodule GelfLogger.Worker do {:noreply, state} end - def handle_cast({:configure, from, name, options}, state) do + def handle_cast({:configure, name, options}, state) do if state.socket do :gen_udp.close(state.socket) end state = GelfLogger.Config.configure(name, options) - GenServer.reply(from, state) + :persistent_term.put(:gelf_logger, {:configure, name, options}) {:noreply, state} end diff --git a/lib/logger/backends/gelf_async.ex b/lib/logger/backends/gelf_async.ex index fa8648b..d67af03 100644 --- a/lib/logger/backends/gelf_async.ex +++ b/lib/logger/backends/gelf_async.ex @@ -80,13 +80,15 @@ defmodule Logger.Backends.GelfAsync do @behaviour :gen_event def init({_module, name}) do - state = GelfLogger.Balancer.configure(name, []) - {:ok, %{name: name, level: state.level}} + GelfLogger.Balancer.configure(name, []) + log_level = GelfLogger.Config.get_loglevel(name, []) + {:ok, %{name: name, level: log_level}} end def handle_call({:configure, options}, state) do - state = GelfLogger.Balancer.configure(state.name, options) - {:ok, :ok, %{state | level: state.level}} + GelfLogger.Balancer.configure(state.name, options) + log_level = GelfLogger.Config.get_loglevel(state.name, options) + {:ok, :ok, %{state | level: log_level}} end def handle_event({_level, gl, _event}, state) when node(gl) != node() do From 6d6c1c4a8b627564f879d0617b6f53ee05f27a4d Mon Sep 17 00:00:00 2001 From: Manuel Rubio Date: Thu, 17 Dec 2020 16:14:28 +0100 Subject: [PATCH 7/7] fix milli should be expressed as microseconds --- lib/gelf_logger/worker.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/gelf_logger/worker.ex b/lib/gelf_logger/worker.ex index 31c9382..7b29ada 100644 --- a/lib/gelf_logger/worker.ex +++ b/lib/gelf_logger/worker.ex @@ -49,7 +49,7 @@ defmodule GelfLogger.Worker do epoch_milliseconds = {{year, month, day}, {hour, min, sec}} - |> NaiveDateTime.from_erl!({milli, 0}) + |> NaiveDateTime.from_erl!({milli * 1_000, 0}) |> DateTime.from_naive!("Etc/UTC") |> DateTime.to_unix(:millisecond)