Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ You probably want to modify `/etc/systemd/system/amqproxy.service` and configure
| Listen port | Port to listen on. This is the port which will be the target of clients | `--port` / `-p` | `LISTEN_PORT` | `[listen] > port` | `5673` |
| Log level | Controls log verbosity.<br><br>Available levels (see [84codes/logger.cr](https://github.com/84codes/logger.cr/blob/main/src/logger.cr#L86)):<br> - `DEBUG`: Low-level information for developers<br> - `INFO`: Generic (useful) information about system operation<br> - `WARN`: Warnings<br> - `ERROR`: Handleable error conditions<br> - `FATAL`: Unhandleable errors that results in a program crash | `--debug` / `-d`: Sets the level to `DEBUG` | - | `[main] > log_level` | `INFO` |
| Idle connection timeout | Maximum time in seconds an unused pooled connection stays open | `--idle-connection-timeout` / `-t` | `IDLE_CONNECTION_TIMEOUT` | `[main] > idle_connection_timeout` | `5` |
| Max upstream channels | Maximum number of channels per upstream connection. The effective limit will be the lowest value between this setting and the upstream server's channel_max from the connection tune packet | `--max-upstream-channels` | `MAX_UPSTREAM_CHANNELS` | `[main] > max_upstream_channels` | `65535` |
| Upstream | AMQP URL that points to the upstream RabbitMQ server to which the proxy should connect to. May only contain scheme, host & port (optional). Example: `amqps://rabbitmq.example.com` | Pass as argument after all options | `AMQP_URL` | `[main] > upstream` | |

### How to configure
Expand Down
1 change: 1 addition & 0 deletions config/example.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[main]
log_level = info
idle_connection_timeout = 5
max_upstream_channels = 65535
upstream = amqp://localhost:5672

[listen]
Expand Down
13 changes: 13 additions & 0 deletions spec/amqproxy/server_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ describe AMQProxy::Server do
end
end

it "creates multiple upstreams when max_upstream_channels is low" do
with_server(max_upstream_channels: 10_u16) do |server, proxy_url|
AMQP::Client.start(proxy_url) do |conn|
25.times { conn.channel }
server.client_connections.should eq 1
server.upstream_connections.should be >= 3
end
sleep 0.1.seconds
server.client_connections.should eq 0
server.upstream_connections.should eq 3
end
end

it "can reconnect if upstream closes" do
with_server do |server, proxy_url|
Fiber.yield
Expand Down
8 changes: 6 additions & 2 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ rescue e : URI::Error
exit 1
end

def with_server(idle_connection_timeout = 5, &)
server = AMQProxy::Server.new(UPSTREAM_URL)
def with_server(idle_connection_timeout = 5, max_upstream_channels = UInt16::MAX, &)
tls = UPSTREAM_URL.scheme == "amqps"
host = UPSTREAM_URL.host || "127.0.0.1"
port = UPSTREAM_URL.port || 5672
port = 5671 if tls && UPSTREAM_URL.port.nil?
server = AMQProxy::Server.new(host, port, tls, idle_connection_timeout, max_upstream_channels)
tcp_server = TCPServer.new("127.0.0.1", 0)
amqp_url = "amqp://#{tcp_server.local_address}"
spawn { server.listen(tcp_server) }
Expand Down
4 changes: 2 additions & 2 deletions src/amqproxy/channel_pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module AMQProxy
@lock = Mutex.new
@upstreams = Deque(Upstream).new

def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32)
def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, @credentials : Credentials, @idle_connection_timeout : Int32, @max_upstream_channels : UInt16)
spawn shrink_pool_loop, name: "shrink pool loop"
end

Expand All @@ -36,7 +36,7 @@ module AMQProxy
end

private def add_upstream
upstream = Upstream.new(@host, @port, @tls_ctx, @credentials)
upstream = Upstream.new(@host, @port, @tls_ctx, @credentials, @max_upstream_channels)
Log.info { "Adding upstream connection" }
@upstreams.unshift upstream
spawn(name: "Upstream#read_loop") do
Expand Down
8 changes: 7 additions & 1 deletion src/amqproxy/cli.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class AMQProxy::CLI
@http_port = 15673
@log_level : ::Log::Severity = ::Log::Severity::Info
@idle_connection_timeout : Int32 = 5
@max_upstream_channels : UInt16 = UInt16::MAX
@term_timeout = -1
@term_client_close_timeout = 0
@server : AMQProxy::Server? = nil
Expand All @@ -27,6 +28,7 @@ class AMQProxy::CLI
when "upstream" then @upstream = value
when "log_level" then @log_level = ::Log::Severity.parse(value)
when "idle_connection_timeout" then @idle_connection_timeout = value.to_i
when "max_upstream_channels" then @max_upstream_channels = value.to_u16
when "term_timeout" then @term_timeout = value.to_i
when "term_client_close_timeout" then @term_client_close_timeout = value.to_i
else raise "Unsupported config #{name}/#{key}"
Expand Down Expand Up @@ -54,6 +56,7 @@ class AMQProxy::CLI
@http_port = ENV["HTTP_PORT"]?.try &.to_i || @http_port
@log_level = ENV["LOG_LEVEL"]?.try { |level| ::Log::Severity.parse(level) } || @log_level
@idle_connection_timeout = ENV["IDLE_CONNECTION_TIMEOUT"]?.try &.to_i || @idle_connection_timeout
@max_upstream_channels = ENV["MAX_UPSTREAM_CHANNELS"]?.try &.to_u16 || @max_upstream_channels
@term_timeout = ENV["TERM_TIMEOUT"]?.try &.to_i || @term_timeout
@term_client_close_timeout = ENV["TERM_CLIENT_CLOSE_TIMEOUT"]?.try &.to_i || @term_client_close_timeout
@upstream = ENV["AMQP_URL"]? || @upstream
Expand Down Expand Up @@ -81,6 +84,9 @@ class AMQProxy::CLI
parser.on("-t IDLE_CONNECTION_TIMEOUT", "--idle-connection-timeout=SECONDS", "Maximum time in seconds an unused pooled connection stays open (default 5s)") do |v|
@idle_connection_timeout = v.to_i
end
parser.on("--max-upstream-channels=CHANNELS", "Maximum channels per upstream connection (default: server max or 65535)") do |v|
@max_upstream_channels = v.to_u16
end
parser.on("--term-timeout=SECONDS", "At TERM the server waits SECONDS seconds for clients to gracefully close their sockets after Close has been sent (default: infinite)") do |v|
@term_timeout = v.to_i
end
Expand Down Expand Up @@ -117,7 +123,7 @@ class AMQProxy::CLI
Signal::INT.trap &->self.initiate_shutdown(Signal)
Signal::TERM.trap &->self.initiate_shutdown(Signal)

server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout)
server = @server = AMQProxy::Server.new(u.hostname || "", port, tls, @idle_connection_timeout, @max_upstream_channels)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if it's @max_upstream_channels is set to 0? It should probably default to `UInt16::MAX in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes setting that to 0 does indeed break amqproxy, and yes that sounds like how it "should" work.


HTTPServer.new(server, @listen_address, @http_port.to_i)
server.listen(@listen_address, @listen_port.to_i)
Expand Down
4 changes: 2 additions & 2 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ module AMQProxy
new(host, port, tls, idle_connection_timeout)
end

def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5)
def initialize(upstream_host, upstream_port, upstream_tls, idle_connection_timeout = 5, max_upstream_channels = UInt16::MAX)
tls_ctx = OpenSSL::SSL::Context::Client.new if upstream_tls
@channel_pools = Hash(Credentials, ChannelPool).new do |hash, credentials|
hash[credentials] = ChannelPool.new(upstream_host, upstream_port, tls_ctx, credentials, idle_connection_timeout)
hash[credentials] = ChannelPool.new(upstream_host, upstream_port, tls_ctx, credentials, idle_connection_timeout, max_upstream_channels)
end
Log.info { "Proxy upstream: #{upstream_host}:#{upstream_port} #{upstream_tls ? "TLS" : ""}" }
end
Expand Down
6 changes: 4 additions & 2 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module AMQProxy
@channel_max : UInt16
@frame_max : UInt32

def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials)
def initialize(@host : String, @port : Int32, @tls_ctx : OpenSSL::SSL::Context::Client?, credentials, @max_upstream_channels : UInt16 = UInt16::MAX)
tcp_socket = TCPSocket.new(@host, @port)
tcp_socket.sync = false
tcp_socket.keepalive = true
Expand Down Expand Up @@ -203,7 +203,9 @@ module AMQProxy

case tune = AMQ::Protocol::Frame.from_io(@socket)
when AMQ::Protocol::Frame::Connection::Tune
channel_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max
server_max = tune.channel_max.zero? ? UInt16::MAX : tune.channel_max
max_upstream_channels = @max_upstream_channels.zero? ? UInt16::MAX : @max_upstream_channels
channel_max = Math.min(server_max, max_upstream_channels)
frame_max = tune.frame_max.zero? ? 131072_u32 : Math.min(131072_u32, tune.frame_max)
tune_ok = AMQ::Protocol::Frame::Connection::TuneOk.new(channel_max, frame_max, tune.heartbeat)
@socket.write_bytes tune_ok, IO::ByteFormat::NetworkEndian
Expand Down