diff --git a/README.md b/README.md index 90142db..c3ff4b9 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,7 @@ You probably want to modify `/etc/systemd/system/amqproxy.service` and configure | Listen port | Port to listen on. This is the port which will be the target of clients | `--port` / `-p` | `LISTEN_PORT` | `[listen] > port` | `5673` | | Log level | Controls log verbosity.

Available levels (see [84codes/logger.cr](https://github.com/84codes/logger.cr/blob/main/src/logger.cr#L86)):
- `DEBUG`: Low-level information for developers
- `INFO`: Generic (useful) information about system operation
- `WARN`: Warnings
- `ERROR`: Handleable error conditions
- `FATAL`: Unhandleable errors that results in a program crash | `--debug` / `-d`: Sets the level to `DEBUG` | - | `[main] > log_level` | `INFO` | | Idle connection timeout | Maximum time in seconds an unused pooled connection stays open | `--idle-connection-timeout` / `-t` | `IDLE_CONNECTION_TIMEOUT` | `[main] > idle_connection_timeout` | `5` | +| Max upstream channels | Maximum number of channels per upstream connection. The effective limit will be the lowest value between this setting and the upstream server's channel_max from the connection tune packet | `--max-upstream-channels` | `MAX_UPSTREAM_CHANNELS` | `[main] > max_upstream_channels` | `65535` | | Upstream | AMQP URL that points to the upstream RabbitMQ server to which the proxy should connect to. May only contain scheme, host & port (optional). Example: `amqps://rabbitmq.example.com` | Pass as argument after all options | `AMQP_URL` | `[main] > upstream` | | ### How to configure diff --git a/config/example.ini b/config/example.ini index d47aa99..7fc055c 100644 --- a/config/example.ini +++ b/config/example.ini @@ -1,6 +1,7 @@ [main] log_level = info idle_connection_timeout = 5 +max_upstream_channels = 65535 upstream = amqp://localhost:5672 [listen] diff --git a/spec/amqproxy/server_spec.cr b/spec/amqproxy/server_spec.cr index bd8c4ca..c81f9c2 100644 --- a/spec/amqproxy/server_spec.cr +++ b/spec/amqproxy/server_spec.cr @@ -90,6 +90,19 @@ describe AMQProxy::Server do end end + it "creates multiple upstreams when max_upstream_channels is low" do + with_server(max_upstream_channels: 10_u16) do |server, proxy_url| + AMQP::Client.start(proxy_url) do |conn| + 25.times { conn.channel } + server.client_connections.should eq 1 + server.upstream_connections.should be >= 3 + end + sleep 0.1.seconds + server.client_connections.should eq 0 + server.upstream_connections.should eq 3 + end + end + it "can reconnect if upstream closes" do with_server do |server, proxy_url| Fiber.yield diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index b0036bf..7beeb61 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -16,8 +16,12 @@ rescue e : URI::Error exit 1 end -def with_server(idle_connection_timeout = 5, &) - server = AMQProxy::Server.new(UPSTREAM_URL) +def with_server(idle_connection_timeout = 5, max_upstream_channels = UInt16::MAX, &) + tls = UPSTREAM_URL.scheme == "amqps" + host = UPSTREAM_URL.host || "127.0.0.1" + port = UPSTREAM_URL.port || 5672 + port = 5671 if tls && UPSTREAM_URL.port.nil? + server = AMQProxy::Server.new(host, port, tls, idle_connection_timeout, max_upstream_channels) tcp_server = TCPServer.new("127.0.0.1", 0) amqp_url = "amqp://#{tcp_server.local_address}" spawn { server.listen(tcp_server) } diff --git a/src/amqproxy/channel_pool.cr b/src/amqproxy/channel_pool.cr index c88d281..f702f16 100644 --- a/src/amqproxy/channel_pool.cr +++ b/src/amqproxy/channel_pool.cr @@ -9,7 +9,7 @@ module AMQProxy @lock = Mutex.new @upstreams = Deque(Upstream).new - def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32) + def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32, @max_upstream_channels : UInt16) spawn shrink_pool_loop, name: "shrink pool loop" end @@ -36,7 +36,7 @@ module AMQProxy end private def add_upstream - upstream = Upstream.new(@host, @port, @tls_ctx, @credentials) + upstream = Upstream.new(@host, @port, @tls_ctx, @credentials, @max_upstream_channels) Log.info { "Adding upstream connection" } @upstreams.unshift upstream spawn(name: "Upstream#read_loop") do diff --git a/src/amqproxy/cli.cr b/src/amqproxy/cli.cr index 24e5ce2..a864ea2 100644 --- a/src/amqproxy/cli.cr +++ b/src/amqproxy/cli.cr @@ -14,6 +14,7 @@ class AMQProxy::CLI @http_port = 15673 @log_level : ::Log::Severity = ::Log::Severity::Info @idle_connection_timeout : Int32 = 5 + @max_upstream_channels : UInt16 = UInt16::MAX @term_timeout = -1 @term_client_close_timeout = 0 @server : AMQProxy::Server? = nil @@ -27,6 +28,7 @@ class AMQProxy::CLI when "upstream" then @upstream = value when "log_level" then @log_level = ::Log::Severity.parse(value) when "idle_connection_timeout" then @idle_connection_timeout = value.to_i + when "max_upstream_channels" then @max_upstream_channels = value.to_u16 when "term_timeout" then @term_timeout = value.to_i when "term_client_close_timeout" then @term_client_close_timeout = value.to_i else raise "Unsupported config #{name}/#{key}" @@ -54,6 +56,7 @@ class AMQProxy::CLI @http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port @log_level = ENV["LOG_LEVEL"]?.try { |level| ::Log::Severity.parse(level) } || @log_level @idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || @idle_connection_timeout + @max_upstream_channels = ENV["MAX_UPSTREAM_CHANNELS"]?.try &.to_u16 || @max_upstream_channels @term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || @term_timeout @term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || @term_client_close_timeout @upstream = ENV["AMQP_URL"]? || @upstream @@ -81,6 +84,9 @@ class AMQProxy::CLI parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v| @idle_connection_timeout = v.to_i end + parser.on("--max-upstream-channels=CHANNELS", "Maximum channels per upstream connection (default: server max or 65535)") do |v| + @max_upstream_channels = v.to_u16 + end parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v| @term_timeout = v.to_i end @@ -117,7 +123,7 @@ class AMQProxy::CLI Signal::INT.trap &->self.initiate_shutdown(Signal) Signal::TERM.trap &->self.initiate_shutdown(Signal) - server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout) + server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout, @max_upstream_channels) HTTPServer.new(server, @listen_address, @http_port.to_i) server.listen(@listen_address, @listen_port.to_i) diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index 4af02f4..fe04409 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -21,10 +21,10 @@ module AMQProxy new(host, port, tls, idle_connection_timeout) end - def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5) + def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5, max_upstream_channels = UInt16::MAX) tls_ctx = OpenSSL::SSL::Context::Client.new if upstream_tls @channel_pools = Hash(Credentials, ChannelPool).new do |hash, credentials| - hash[credentials] = ChannelPool.new(upstream_host, upstream_port, tls_ctx, credentials, idle_connection_timeout) + hash[credentials] = ChannelPool.new(upstream_host, upstream_port, tls_ctx, credentials, idle_connection_timeout, max_upstream_channels) end Log.info { "Proxy upstream: #{upstream_host}:#{upstream_port} #{upstream_tls ? "TLS" : ""}" } end diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 681bf26..d6a7e50 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -16,7 +16,7 @@ module AMQProxy @channel_max : UInt16 @frame_max : UInt32 - def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials) + def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials, @max_upstream_channels : UInt16 = UInt16::MAX) tcp_socket = TCPSocket.new(@host, @port) tcp_socket.sync = false tcp_socket.keepalive = true @@ -203,7 +203,9 @@ module AMQProxy case tune = AMQ::Protocol::Frame.from_io(@socket) when AMQ::Protocol::Frame::Connection::Tune - channel_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max + server_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max + max_upstream_channels = @max_upstream_channels.zero? ? UInt16::MAX : @max_upstream_channels + channel_max = Math.min(server_max, max_upstream_channels) frame_max = tune.frame_max.zero? ? 131072_u32 : Math.min(131072_u32, tune.frame_max) tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(channel_max, frame_max, tune.heartbeat) @socket.write_bytes tune_ok, IO::ByteFormat::NetworkEndian