diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 059f78a..f0816fa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,5 +1,8 @@ name: CI on: + pull_request: + branches: + - main push: paths: - 'run-specs-in-docker.sh' diff --git a/amqproxy b/amqproxy new file mode 160000 index 0000000..d09258d --- /dev/null +++ b/amqproxy @@ -0,0 +1 @@ +Subproject commit d09258d8dec72b178e756e33ec806b70ee9fc94a diff --git a/spec/Dockerfile b/spec/Dockerfile index 6a46850..a17ff95 100644 --- a/spec/Dockerfile +++ b/spec/Dockerfile @@ -11,6 +11,9 @@ RUN shards install COPY src/ src/ COPY spec/ spec/ +COPY spec/config.ini /tmp/config.ini +COPY spec/config_empty.ini /tmp/config_empty.ini + COPY spec/entrypoint.sh /entrypoint.sh RUN chmod +x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] diff --git a/spec/amqproxy/config_spec.cr b/spec/amqproxy/config_spec.cr new file mode 100644 index 0000000..d9425cf --- /dev/null +++ b/spec/amqproxy/config_spec.cr @@ -0,0 +1,189 @@ +require "spec" +require "../../src/amqproxy/config" + +describe AMQProxy::Config do + it "loads defaults when no ini file, env vars or options are available" do + previous_argv = ARGV.clone + ARGV.clear + + ARGV.concat([ + "--config=/tmp/non_existing_file.ini", + ]) + + config = AMQProxy::Config.load_with_cli(ARGV) + + config.listen_address.should eq "localhost" + config.listen_port.should eq 5673 + config.http_port.should eq 15673 + config.log_level.should eq ::Log::Severity::Info + config.idle_connection_timeout.should eq 5 + config.term_timeout.should eq -1 + config.term_client_close_timeout.should eq 0 + config.upstream.should eq nil + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "reads from empty config file returning default configuration" do + previous_argv = ARGV.clone + ARGV.clear + + ARGV.concat(["--config=/tmp/config_empty.ini"]) + + config = AMQProxy::Config.load_with_cli(ARGV) + + config.listen_address.should eq "localhost" + config.listen_port.should eq 5673 + config.http_port.should eq 15673 + config.log_level.should eq ::Log::Severity::Info + config.idle_connection_timeout.should eq 5 + config.term_timeout.should eq -1 + config.term_client_close_timeout.should eq 0 + config.upstream.should eq nil + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "reads from environment variables and overrules ini file values" do + previous_argv = ARGV.clone + ARGV.clear + + ENV["LISTEN_ADDRESS"] = "example.com" + ENV["LISTEN_PORT"] = "5674" + ENV["HTTP_PORT"] = "15674" + ENV["LOG_LEVEL"] = "Error" + ENV["IDLE_CONNECTION_TIMEOUT"] = "12" + ENV["TERM_TIMEOUT"] = "13" + ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" + ENV["UPSTREAM"] = "amqp://localhost:5674" + + config = AMQProxy::Config.load_with_cli(ARGV) + + config.listen_address.should eq "example.com" + config.listen_port.should eq 5674 + config.http_port.should eq 15674 + config.log_level.should eq ::Log::Severity::Error + config.idle_connection_timeout.should eq 12 + config.term_timeout.should eq 13 + config.term_client_close_timeout.should eq 14 + config.upstream.should eq "amqp://localhost:5674" + + # Clean up + ENV.delete("LISTEN_ADDRESS") + ENV.delete("LISTEN_PORT") + ENV.delete("HTTP_PORT") + ENV.delete("LOG_LEVEL") + ENV.delete("IDLE_CONNECTION_TIMEOUT") + ENV.delete("TERM_TIMEOUT") + ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT") + ENV.delete("UPSTREAM") + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "reads from command line arguments and overrules env vars" do + previous_argv = ARGV.clone + ARGV.clear + + ENV["LISTEN_ADDRESS"] = "example.com" + ENV["LISTEN_PORT"] = "5674" + ENV["HTTP_PORT"] = "15674" + ENV["LOG_LEVEL"] = "Error" + ENV["IDLE_CONNECTION_TIMEOUT"] = "12" + ENV["TERM_TIMEOUT"] = "13" + ENV["TERM_CLIENT_CLOSE_TIMEOUT"] = "14" + ENV["UPSTREAM"] = "amqp://localhost:5674" + + ARGV.concat([ + "--listen=example_arg.com", + "--port=5675", + "--http-port=15675", + "--log-level=Warn", + "--idle-connection-timeout=15", + "--term-timeout=16", + "--term-client-close-timeout=17", + "amqp://localhost:5679", + ]) + + config = AMQProxy::Config.load_with_cli(ARGV) + + config.listen_address.should eq "example_arg.com" + config.log_level.should eq ::Log::Severity::Warn + config.listen_port.should eq 5675 + config.http_port.should eq 15675 + config.idle_connection_timeout.should eq 15 + config.term_timeout.should eq 16 + config.term_client_close_timeout.should eq 17 + config.upstream.should eq "amqp://localhost:5679" + + # Clean Up + ENV.delete("LISTEN_ADDRESS") + ENV.delete("LISTEN_PORT") + ENV.delete("HTTP_PORT") + ENV.delete("LOG_LEVEL") + ENV.delete("IDLE_CONNECTION_TIMEOUT") + ENV.delete("TERM_TIMEOUT") + ENV.delete("TERM_CLIENT_CLOSE_TIMEOUT") + ENV.delete("UPSTREAM") + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "sets log level to debug when debug flag is present" do + previous_argv = ARGV.clone + ARGV.clear + + ARGV.concat([ + "--listen=example_arg.com", + "--port=5675", + "--http-port=15675", + "--log-level=Warn", + "--idle-connection-timeout=15", + "--term-timeout=16", + "--term-client-close-timeout=17", + "--debug", + "amqp://localhost:5679", + ]) + + config = AMQProxy::Config.load_with_cli(ARGV) + + config.listen_address.should eq "example_arg.com" + config.log_level.should eq ::Log::Severity::Debug + config.listen_port.should eq 5675 + config.http_port.should eq 15675 + config.idle_connection_timeout.should eq 15 + config.term_timeout.should eq 16 + config.term_client_close_timeout.should eq 17 + config.upstream.should eq "amqp://localhost:5679" + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end + + it "keeps the log level to trace when debug flag is present" do + previous_argv = ARGV.clone + ARGV.clear + + ARGV.concat([ + "--log-level=Trace", + "--debug", + ]) + + config = AMQProxy::Config.load_with_cli(ARGV) + + config.log_level.should eq ::Log::Severity::Trace + + # Restore ARGV + ARGV.clear + ARGV.concat(previous_argv) + end +end diff --git a/spec/config.ini b/spec/config.ini new file mode 100644 index 0000000..ca38d1c --- /dev/null +++ b/spec/config.ini @@ -0,0 +1,13 @@ +[main] +log_level = debug +idle_connection_timeout = 55 +term_timeout = 56 +term_client_close_timeout = 57 +upstream = amqp://localhost:5678 + +[listen] +bind = 127.0.0.1 +address = 127.0.0.2 +port = 5678 +http_port = 15678 +log_level = debug diff --git a/spec/config_empty.ini b/spec/config_empty.ini new file mode 100644 index 0000000..e69de29 diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 24e5ce2..511a0ef 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -1,3 +1,4 @@ +require "./config" require "./version" require "./server" require "./http_server" @@ -9,94 +10,27 @@ require "log" class AMQProxy::CLI Log = ::Log.for(self) - @listen_address = "localhost" - @listen_port = 5673 - @http_port = 15673 - @log_level : ::Log::Severity = ::Log::Severity::Info - @idle_connection_timeout : Int32 = 5 - @term_timeout = -1 - @term_client_close_timeout = 0 + @config : AMQProxy::Config? = nil @server : AMQProxy::Server? = nil - def parse_config(path) # ameba:disable Metrics/CyclomaticComplexity - INI.parse(File.read(path)).each do |name, section| - case name - when "main", "" - section.each do |key, value| - case key - when "upstream" then @upstream = value - when "log_level" then @log_level = ::Log::Severity.parse(value) - when "idle_connection_timeout" then @idle_connection_timeout = value.to_i - when "term_timeout" then @term_timeout = value.to_i - when "term_client_close_timeout" then @term_client_close_timeout = value.to_i - else raise "Unsupported config #{name}/#{key}" - end - end - when "listen" - section.each do |key, value| - case key - when "port" then @listen_port = value.to_i - when "bind", "address" then @listen_address = value - when "log_level" then @log_level = ::Log::Severity.parse(value) - else raise "Unsupported config #{name}/#{key}" - end - end - else raise "Unsupported config section #{name}" - end - end - rescue ex - abort ex.message - end - - def apply_env_variables - @listen_address = ENV["LISTEN_ADDRESS"]? || @listen_address - @listen_port = ENV["LISTEN_PORT"]?.try &.to_i || @listen_port - @http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port - @log_level = ENV["LOG_LEVEL"]?.try { |level| ::Log::Severity.parse(level) } || @log_level - @idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || @idle_connection_timeout - @term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || @term_timeout - @term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || @term_client_close_timeout - @upstream = ENV["AMQP_URL"]? || @upstream - end - def run(argv) raise "run cant be called multiple times" unless @server.nil? - # Parse config file first - OptionParser.parse(argv) do |parser| - parser.on("-c FILE", "--config=FILE", "Load config file") { |v| parse_config(v) } - parser.invalid_option { } # Invalid arguments are handled by the next OptionParser - end - - apply_env_variables + # load cascading configuration. load sequence: defaults -> file -> env -> cli + config = @config = AMQProxy::Config.load_with_cli(argv) - # Parse CLI arguments - p = OptionParser.parse(argv) do |parser| - parser.banner = "Usage: amqproxy [options] [amqp upstream url]" - parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| - @listen_address = v - end - parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| @listen_port = v.to_i } - parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| @http_port = v.to_i } - parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| - @idle_connection_timeout = v.to_i - end - parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| - @term_timeout = v.to_i - end - parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close before sending Close to clients (default: 0s)") do |v| - @term_client_close_timeout = v.to_i - end - parser.on("-d", "--debug", "Verbose logging") { @log_level = ::Log::Severity::Debug } - parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } - parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } - parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } - end + log_backend = if ENV.has_key?("JOURNAL_STREAM") + ::Log::IOBackend.new(formatter: Journal::LogFormat, dispatcher: ::Log::DirectDispatcher) + else + ::Log::IOBackend.new(formatter: Stdout::LogFormat, dispatcher: ::Log::DirectDispatcher) + end + ::Log.setup_from_env(default_level: config.log_level, backend: log_backend) - @upstream ||= argv.shift? - upstream_url = @upstream || abort p.to_s + Log.debug { config.inspect } + upstream_url = config.upstream || abort "Upstream AMQP url is required. Add -h switch for help." u = URI.parse upstream_url + abort "Invalid upstream URL" unless u.host default_port = case u.scheme @@ -107,20 +41,13 @@ class AMQProxy::CLI port = u.port || default_port tls = u.scheme == "amqps" - log_backend = if ENV.has_key?("JOURNAL_STREAM") - ::Log::IOBackend.new(formatter: Journal::LogFormat, dispatcher: ::Log::DirectDispatcher) - else - ::Log::IOBackend.new(formatter: Stdout::LogFormat, dispatcher: ::Log::DirectDispatcher) - end - ::Log.setup_from_env(default_level: @log_level, backend: log_backend) - Signal::INT.trap &->self.initiate_shutdown(Signal) Signal::TERM.trap &->self.initiate_shutdown(Signal) - server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout) + server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, config.idle_connection_timeout) - HTTPServer.new(server, @listen_address, @http_port.to_i) - server.listen(@listen_address, @listen_port.to_i) + HTTPServer.new(server, config.listen_address, config.http_port) + server.listen(config.listen_address, config.listen_port) shutdown @@ -149,17 +76,22 @@ class AMQProxy::CLI unless server = @server raise "Can't call shutdown before run" end + + unless config = @config + raise "Configuration has not been loaded" + end + if server.client_connections > 0 - if @term_client_close_timeout > 0 - wait_for_clients_to_close @term_client_close_timeout.seconds + if config.term_client_close_timeout > 0 + wait_for_clients_to_close config.term_client_close_timeout.seconds end server.disconnect_clients end if server.client_connections > 0 - if @term_timeout >= 0 + if config.term_timeout >= 0 spawn do - sleep @term_timeout.seconds + sleep config.term_timeout.seconds abort "Exiting with #{server.client_connections} client connections still open" end end diff --git a/src/amqproxy/config.cr b/src/amqproxy/config.cr new file mode 100644 index 0000000..46626f7 --- /dev/null +++ b/src/amqproxy/config.cr @@ -0,0 +1,111 @@ +require "ini" +require "log" +require "option_parser" + +module AMQProxy + class Config + getter listen_address = "localhost" + getter listen_port = 5673 + getter http_port = 15673 + getter log_level = Log::Severity::Info + getter idle_connection_timeout = 5 + getter term_timeout = -1 + getter term_client_close_timeout = 0 + getter upstream : String? = nil + getter debug : Bool? = nil + getter config_file = "config.ini" + + protected def load_from_file # ameba:disable Metrics/CyclomaticComplexity + if config_file.empty? || !File.exists?(config_file) + return self + end + + INI.parse(File.read(config_file)).each do |name, section| + case name + when "main", "" + section.each do |key, value| + case key + when "upstream" then @upstream = value + when "log_level" then @log_level = ::Log::Severity.parse(value) + when "idle_connection_timeout" then @idle_connection_timeout = value.to_i + when "term_timeout" then @term_timeout = value.to_i + when "term_client_close_timeout" then @term_client_close_timeout = value.to_i + else raise "Unsupported config #{name}/#{key}" + end + end + when "listen" + section.each do |key, value| + case key + when "http_port" then @http_port = value.to_i + when "port" then @listen_port = value.to_i + when "bind", "address" then @listen_address = value + when "log_level" then @log_level = ::Log::Severity.parse(value) + else raise "Unsupported config #{name}/#{key}" + end + end + else raise "Unsupported config section #{name}" + end + end + + self + end + + protected def load_from_env # ameba:disable Metrics/CyclomaticComplexity + @listen_address = ENV["LISTEN_ADDRESS"]? || @listen_address + @listen_port = ENV["LISTEN_PORT"]?.try &.to_i || @listen_port + @http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port + @log_level = Log::Severity.parse(ENV["LOG_LEVEL"]? || self.log_level.to_s) || @log_level + @idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || @idle_connection_timeout + @term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || @term_timeout + @term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || @term_client_close_timeout + @upstream = ENV["AMQP_URL"]? || ENV["UPSTREAM"]? || @upstream + + self + end + + protected def load_cli_options(argv) + OptionParser.parse(argv) do |parser| + parser.on("-l ADDRESS", "--listen=ADDRESS", "Address to listen on (default is localhost)") do |v| + @listen_address = v + end + parser.on("-p PORT", "--port=PORT", "Port to listen on (default: 5673)") { |v| @listen_port = v.to_i } + parser.on("-b PORT", "--http-port=PORT", "HTTP Port to listen on (default: 15673)") { |v| @http_port = v.to_i } + parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| + @idle_connection_timeout = v.to_i + end + parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| + @term_timeout = v.to_i + end + parser.on("--term-client-close-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to send Close beforing sending Close to clients (default: 0s)") do |v| + @term_client_close_timeout = v.to_i + end + parser.on("--log-level=LEVEL", "The log level (default: info)") { |v| @log_level = ::Log::Severity.parse(v) } + parser.on("-d", "--debug", "Verbose logging") { @debug = true } + parser.on("-c FILE", "--config=FILE", "Load config file") do |v| + @config_file = v + end + parser.on("-h", "--help", "Show this help") { puts parser.to_s; exit 0 } + parser.on("-v", "--version", "Display version") { puts AMQProxy::VERSION.to_s; exit 0 } + parser.invalid_option { |arg| abort "Invalid argument: #{arg}" } + end + + @upstream = argv.shift? || @upstream + + if @debug && @log_level > Log::Severity::Debug + @log_level = Log::Severity::Debug + end + + self + end + + def self.load_with_cli(argv) + new() + .load_cli_options(argv.dup) # handle config file/help/version options + .load_from_file + .load_from_env + .load_cli_options(argv) + rescue ex + abort ex.message + end + end +end