Skip to content

Commit a42c4b5

Browse files
committed
Allow constraining dynamic listener ports to a specific port range
1 parent 8492b62 commit a42c4b5

File tree

4 files changed

+126
-12
lines changed

4 files changed

+126
-12
lines changed

Diff for: cmd/kafka-proxy/server.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ func initFlags() {
9393
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
9494
Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).")
9595
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
96-
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")
96+
Server.Flags().Uint16Var(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")
97+
Server.Flags().Uint16Var(&c.Proxy.DynamicSequentialMaxPorts, "dynamic-sequential-max-ports", 0, "If set to non-zero, ports are allocated sequentially from the half open interval [dynamic-sequential-min-port, dynamic-sequential-min-port + dynamic-sequential-max-ports)")
9798

9899
Server.Flags().IntVar(&c.Proxy.RequestBufferSize, "proxy-request-buffer-size", 4096, "Request buffer size pro tcp connection")
99100
Server.Flags().IntVar(&c.Proxy.ResponseBufferSize, "proxy-response-buffer-size", 4096, "Response buffer size pro tcp connection")

Diff for: cmd/kafka-proxy/server_test.go

+72
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,78 @@ func TestSameClientCertEnabledWithMissingFlags(t *testing.T) {
228228
})
229229
}
230230

231+
func TestDynamicPortIntervals(t *testing.T) {
232+
233+
setupBootstrapServersMappingTest()
234+
noMinPort := []string{"cobra.test",
235+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
236+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
237+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
238+
"--deterministic-listeners", "",
239+
}
240+
241+
noMinPort2 := []string{"cobra.test",
242+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
243+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
244+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
245+
"--dynamic-sequential-max-ports", "2000",
246+
}
247+
248+
t.Run("MinPortMandatoryForDeterministicListeners", func(t *testing.T) {
249+
serverPreRunFailure(t, noMinPort, "Proxy.DynamicSequentialMinPort must be set to a positive value between 1 and 65535 when Proxy.DeterministicListeners is enabled")
250+
})
251+
t.Run("MinPortMandatoryIfMaxPortsIsSet", func(t *testing.T) {
252+
serverPreRunFailure(t, noMinPort2, "Proxy.DynamicSequentialMinPort must be set to a positive value between 1 and 65535 when Proxy.DynamicSequentialMaxPorts is set")
253+
})
254+
255+
args1 := []string{"cobra.test",
256+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
257+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
258+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
259+
"--dynamic-sequential-min-port", "2000",
260+
"--dynamic-sequential-max-ports", "2000",
261+
}
262+
_ = Server.ParseFlags(args1)
263+
err := Server.PreRunE(nil, args1)
264+
a := assert.New(t)
265+
a.Nil(err)
266+
267+
args2 := []string{"cobra.test",
268+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
269+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
270+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
271+
"--dynamic-sequential-min-port", "2000",
272+
"--dynamic-sequential-max-ports", "2000",
273+
"--deterministic-listeners", "",
274+
}
275+
_ = Server.ParseFlags(args2)
276+
err = Server.PreRunE(nil, args2)
277+
a = assert.New(t)
278+
a.Nil(err)
279+
280+
args3 := []string{"cobra.test",
281+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
282+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
283+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
284+
"--dynamic-sequential-min-port", "2000",
285+
"--deterministic-listeners", "",
286+
}
287+
_ = Server.ParseFlags(args3)
288+
err = Server.PreRunE(nil, args3)
289+
a = assert.New(t)
290+
a.Nil(err)
291+
292+
args4 := []string{"cobra.test",
293+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
294+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32402",
295+
"--bootstrap-server-mapping", "192.168.99.100:32402,0.0.0.0:32403",
296+
}
297+
_ = Server.ParseFlags(args4)
298+
err = Server.PreRunE(nil, args4)
299+
a = assert.New(t)
300+
a.Nil(err)
301+
}
302+
231303
func serverPreRunFailure(t *testing.T, cmdLineFlags []string, expectedErrorMsg string) {
232304
setupBootstrapServersMappingTest()
233305

Diff for: config/config.go

+20-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ type Config struct {
8181
DialAddressMappings []DialAddressMapping
8282
DisableDynamicListeners bool
8383
DynamicAdvertisedListener string
84-
DynamicSequentialMinPort int
84+
DynamicSequentialMinPort uint16
85+
DynamicSequentialMaxPorts uint16
8586
RequestBufferSize int
8687
ResponseBufferSize int
8788
ListenerReadBufferSize int // SO_RCVBUF
@@ -432,5 +433,23 @@ func (c *Config) Validate() error {
432433
}
433434

434435
}
436+
437+
if !c.Proxy.DisableDynamicListeners {
438+
if c.Proxy.DynamicSequentialMinPort == 0 && c.Proxy.DeterministicListeners {
439+
// dynamic-sequential-min-port must be set for deterministic-listeners to be enabled, as the latter
440+
// does not work with random (OS allocated ephemeral) ports.
441+
return errors.New("Proxy.DynamicSequentialMinPort must be set to a positive value between 1 and 65535 when Proxy.DeterministicListeners is enabled")
442+
}
443+
if c.Proxy.DynamicSequentialMinPort == 0 && c.Proxy.DynamicSequentialMaxPorts > 0 {
444+
// dynamic-sequential-min-port must be set if dynamic-sequential-max-ports is set, as the latter
445+
// does not work with random (OS allocated ephemeral) ports.
446+
return errors.New("Proxy.DynamicSequentialMinPort must be set to a positive value between 1 and 65535 when Proxy.DynamicSequentialMaxPorts is set")
447+
}
448+
// Set default for DynamicSequentialMaxPorts if DynamicSequentialMinPort is set, to make sure
449+
// ports never exceed the 16-bit max port number of 65535.
450+
if c.Proxy.DynamicSequentialMaxPorts == 0 && c.Proxy.DynamicSequentialMinPort > 0 {
451+
c.Proxy.DynamicSequentialMaxPorts = uint16(65536 - uint32(c.Proxy.DynamicSequentialMinPort))
452+
}
453+
}
435454
return nil
436455
}

Diff for: proxy/proxy.go

+32-10
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ type Listeners struct {
2828

2929
listenFunc ListenFunc
3030

31-
deterministicListeners bool
32-
disableDynamicListeners bool
33-
dynamicSequentialMinPort int
31+
deterministicListeners bool
32+
disableDynamicListeners bool
33+
dynamicSequentialMinPort uint16
34+
currentDynamicPortCounter uint64
35+
dynamicSequentialMaxPorts uint16
3436

3537
brokerToListenerConfig map[string]*ListenerConfig
3638
lock sync.RWMutex
@@ -74,6 +76,8 @@ func NewListeners(cfg *config.Config) (*Listeners, error) {
7476
deterministicListeners: cfg.Proxy.DeterministicListeners,
7577
disableDynamicListeners: cfg.Proxy.DisableDynamicListeners,
7678
dynamicSequentialMinPort: cfg.Proxy.DynamicSequentialMinPort,
79+
currentDynamicPortCounter: 0,
80+
dynamicSequentialMaxPorts: cfg.Proxy.DynamicSequentialMaxPorts,
7781
}, nil
7882
}
7983

@@ -149,6 +153,16 @@ func (p *Listeners) findListenerConfig(brokerId int32) *ListenerConfig {
149153
return nil
150154
}
151155

156+
// Make sure all dynamically allocated ports are within the half open interval
157+
// [dynamicSequentialMinPort, dynamicSequentialMinPort + dynamicSequentialMaxPorts).
158+
func (p *Listeners) nextDynamicPort(portOffset uint64, brokerAddress string, brokerId int32) (uint16, error) {
159+
port := p.dynamicSequentialMinPort + uint16(portOffset%uint64(p.dynamicSequentialMaxPorts))
160+
if port < p.dynamicSequentialMinPort {
161+
return 0, fmt.Errorf("port assignment overflow %s %d: %d", brokerAddress, brokerId, port)
162+
}
163+
return port, nil
164+
}
165+
152166
func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32) (string, int32, error) {
153167
p.lock.Lock()
154168
defer p.lock.Unlock()
@@ -162,11 +176,11 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
162176
if brokerId < 0 {
163177
return "", 0, fmt.Errorf("brokerId is negative %s %d", brokerAddress, brokerId)
164178
}
165-
deterministicPort := p.dynamicSequentialMinPort + int(brokerId)
166-
if deterministicPort < p.dynamicSequentialMinPort {
167-
return "", 0, fmt.Errorf("port assignment overflow %s %d: %d", brokerAddress, brokerId, deterministicPort)
179+
deterministicPort, err := p.nextDynamicPort(uint64(brokerId), brokerAddress, brokerId)
180+
if err != nil {
181+
return "", 0, err
168182
}
169-
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(deterministicPort))
183+
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(int(deterministicPort)))
170184
cfg := p.findListenerConfig(brokerId)
171185
if cfg != nil {
172186
oldBrokerAddress := cfg.GetBrokerAddress()
@@ -179,9 +193,17 @@ func (p *Listeners) ListenDynamicInstance(brokerAddress string, brokerId int32)
179193
return util.SplitHostPort(cfg.AdvertisedAddress)
180194
}
181195
} else {
182-
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(p.dynamicSequentialMinPort))
183-
if p.dynamicSequentialMinPort != 0 {
184-
p.dynamicSequentialMinPort += 1
196+
if p.dynamicSequentialMinPort == 0 {
197+
// Use random (non sequential) ephemeral free port, allocated by OS.
198+
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(0))
199+
} else {
200+
// Use sequentially allocated port.
201+
port, err := p.nextDynamicPort(uint64(p.currentDynamicPortCounter), brokerAddress, brokerId)
202+
if err != nil {
203+
return "", 0, err
204+
}
205+
listenerAddress = net.JoinHostPort(p.defaultListenerIP, strconv.Itoa(int(port)))
206+
p.currentDynamicPortCounter += 1
185207
}
186208
}
187209
cfg := NewListenerConfig(brokerAddress, listenerAddress, "", brokerId)

0 commit comments

Comments
 (0)