Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ You probably want to modify `/etc/systemd/system/amqproxy.service` and configure
| Listen port | Port to listen on. This is the port which will be the target of clients | `--port` / `-p` | `LISTEN_PORT` | `[listen] > port` | `5673` |
| Log level | Controls log verbosity.<br><br>Available levels (see [84codes/logger.cr](https://github.com/84codes/logger.cr/blob/main/src/logger.cr#L86)):<br> - `DEBUG`: Low-level information for developers<br> - `INFO`: Generic (useful) information about system operation<br> - `WARN`: Warnings<br> - `ERROR`: Handleable error conditions<br> - `FATAL`: Unhandleable errors that results in a program crash | `--debug` / `-d`: Sets the level to `DEBUG` | - | `[main] > log_level` | `INFO` |
| Idle connection timeout | Maximum time in seconds an unused pooled connection stays open | `--idle-connection-timeout` / `-t` | `IDLE_CONNECTION_TIMEOUT` | `[main] > idle_connection_timeout` | `5` |
| Max upstream channels | Maximum number of channels per upstream connection. The effective limit will be the lowest value between this setting and the upstream server's channel_max from the connection tune packet | `--max-upstream-channels` | `MAX_UPSTREAM_CHANNELS` | `[main] > max_upstream_channels` | `65535` |
| Upstream | AMQP URL that points to the upstream RabbitMQ server to which the proxy should connect to. May only contain scheme, host & port (optional). Example: `amqps://rabbitmq.example.com` | Pass as argument after all options | `AMQP_URL` | `[main] > upstream` | |

### How to configure
Expand Down
1 change: 1 addition & 0 deletions config/example.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[main]
log_level = info
idle_connection_timeout = 5
max_upstream_channels = 65535
upstream = amqp://localhost:5672

[listen]
Expand Down
13 changes: 13 additions & 0 deletions spec/amqproxy/server_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ describe AMQProxy::Server do
end
end

it "creates multiple upstreams when max_upstream_channels is low" do
with_server(max_upstream_channels: 10_u16) do |server, proxy_url|
AMQP::Client.start(proxy_url) do |conn|
25.times { conn.channel }
server.client_connections.should eq 1
server.upstream_connections.should be >= 3
end
sleep 0.1.seconds
server.client_connections.should eq 0
server.upstream_connections.should eq 3
end
end

it "can reconnect if upstream closes" do
with_server do |server, proxy_url|
Fiber.yield
Expand Down
8 changes: 6 additions & 2 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ rescue e : URI::Error
exit 1
end

def with_server(idle_connection_timeout = 5, &)
server = AMQProxy::Server.new(UPSTREAM_URL)
def with_server(idle_connection_timeout = 5, max_upstream_channels = UInt16::MAX, &)
tls = UPSTREAM_URL.scheme == "amqps"
host = UPSTREAM_URL.host || "127.0.0.1"
port = UPSTREAM_URL.port || 5672
port = 5671 if tls && UPSTREAM_URL.port.nil?
server = AMQProxy::Server.new(host, port, tls, idle_connection_timeout, max_upstream_channels)
tcp_server = TCPServer.new("127.0.0.1", 0)
amqp_url = "amqp://#{tcp_server.local_address}"
spawn { server.listen(tcp_server) }
Expand Down
4 changes: 2 additions & 2 deletions src/amqproxy/channel_pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module AMQProxy
@lock = Mutex.new
@upstreams = Deque(Upstream).new

def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32)
def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32, @max_upstream_channels : UInt16)
spawn shrink_pool_loop, name: "shrink pool loop"
end

Expand All @@ -36,7 +36,7 @@ module AMQProxy
end

private def add_upstream
upstream = Upstream.new(@host, @port, @tls_ctx, @credentials)
upstream = Upstream.new(@host, @port, @tls_ctx, @credentials, @max_upstream_channels)
Log.info { "Adding upstream connection" }
@upstreams.unshift upstream
spawn(name: "Upstream#read_loop") do
Expand Down
8 changes: 7 additions & 1 deletion src/amqproxy/cli.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class AMQProxy::CLI
@http_port = 15673
@log_level : ::Log::Severity = ::Log::Severity::Info
@idle_connection_timeout : Int32 = 5
@max_upstream_channels : UInt16 = UInt16::MAX
@term_timeout = -1
@term_client_close_timeout = 0
@server : AMQProxy::Server? = nil
Expand All @@ -27,6 +28,7 @@ class AMQProxy::CLI
when "upstream" then @upstream = value
when "log_level" then @log_level = ::Log::Severity.parse(value)
when "idle_connection_timeout" then @idle_connection_timeout = value.to_i
when "max_upstream_channels" then @max_upstream_channels = value.to_u16
when "term_timeout" then @term_timeout = value.to_i
when "term_client_close_timeout" then @term_client_close_timeout = value.to_i
else raise "Unsupported config #{name}/#{key}"
Expand Down Expand Up @@ -54,6 +56,7 @@ class AMQProxy::CLI
@http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port
@log_level = ENV["LOG_LEVEL"]?.try { |level| ::Log::Severity.parse(level) } || @log_level
@idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || @idle_connection_timeout
@max_upstream_channels = ENV["MAX_UPSTREAM_CHANNELS"]?.try &.to_u16 || @max_upstream_channels
@term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || @term_timeout
@term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || @term_client_close_timeout
@upstream = ENV["AMQP_URL"]? || @upstream
Expand Down Expand Up @@ -81,6 +84,9 @@ class AMQProxy::CLI
parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v|
@idle_connection_timeout = v.to_i
end
parser.on("--max-upstream-channels=CHANNELS", "Maximum channels per upstream connection (default: server max or 65535)") do |v|
@max_upstream_channels = v.to_u16
end
parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v|
@term_timeout = v.to_i
end
Expand Down Expand Up @@ -117,7 +123,7 @@ class AMQProxy::CLI
Signal::INT.trap &->self.initiate_shutdown(Signal)
Signal::TERM.trap &->self.initiate_shutdown(Signal)

server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout)
server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout, @max_upstream_channels)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if it's @max_upstream_channels is set to 0? It should probably default to `UInt16::MAX in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes setting that to 0 does indeed break amqproxy, and yes that sounds like how it "should" work.


HTTPServer.new(server, @listen_address, @http_port.to_i)
server.listen(@listen_address, @listen_port.to_i)
Expand Down
4 changes: 2 additions & 2 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ module AMQProxy
new(host, port, tls, idle_connection_timeout)
end

def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5)
def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5, max_upstream_channels = UInt16::MAX)
tls_ctx = OpenSSL::SSL::Context::Client.new if upstream_tls
@channel_pools = Hash(Credentials, ChannelPool).new do |hash, credentials|
hash[credentials] = ChannelPool.new(upstream_host, upstream_port, tls_ctx, credentials, idle_connection_timeout)
hash[credentials] = ChannelPool.new(upstream_host, upstream_port, tls_ctx, credentials, idle_connection_timeout, max_upstream_channels)
end
Log.info { "Proxy upstream: #{upstream_host}:#{upstream_port} #{upstream_tls ? "TLS" : ""}" }
end
Expand Down
5 changes: 3 additions & 2 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module AMQProxy
@channel_max : UInt16
@frame_max : UInt32

def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials)
def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials, @max_upstream_channels : UInt16 = UInt16::MAX)
tcp_socket = TCPSocket.new(@host, @port)
tcp_socket.sync = false
tcp_socket.keepalive = true
Expand Down Expand Up @@ -203,7 +203,8 @@ module AMQProxy

case tune = AMQ::Protocol::Frame.from_io(@socket)
when AMQ::Protocol::Frame::Connection::Tune
channel_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max
server_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max
channel_max = Math.min(server_max, @max_upstream_channels)
frame_max = tune.frame_max.zero? ? 131072_u32 : Math.min(131072_u32, tune.frame_max)
tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(channel_max, frame_max, tune.heartbeat)
@socket.write_bytes tune_ok, IO::ByteFormat::NetworkEndian
Expand Down