Skip to content

Commit 2318bd6

Browse files
authored
Merge pull request #191 from grepplabs/pr-183-alt
Rework of PR# 183 feat: Add support for deterministic listener ports (based on broker ID)
2 parents e5074c8 + 93770ec commit 2318bd6

File tree

8 files changed

+279
-76
lines changed

8 files changed

+279
-76
lines changed

cmd/kafka-proxy/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func initFlags() {
8989
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
9090
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
9191
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
92+
Server.Flags().BoolVar(&c.Proxy.DeterministicListeners, "deterministic-listeners", false, "Enable deterministic listeners (listener port = min port + broker id).")
9293
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
9394
Server.Flags().IntVar(&c.Proxy.DynamicSequentialMinPort, "dynamic-sequential-min-port", 0, "If set to non-zero, makes the dynamic listener use a sequential port starting with this value rather than a random port every time.")
9495

config/config.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@ var (
2222
Version = "unknown"
2323
)
2424

25-
type NetAddressMappingFunc func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error)
25+
type NetAddressMappingFunc func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error)
2626

2727
type ListenerConfig struct {
2828
BrokerAddress string
2929
ListenerAddress string
3030
AdvertisedAddress string
3131
}
32+
3233
type DialAddressMapping struct {
3334
SourceAddress string
3435
DestinationAddress string
@@ -74,6 +75,7 @@ type Config struct {
7475
DefaultListenerIP string
7576
BootstrapServers []ListenerConfig
7677
ExternalServers []ListenerConfig
78+
DeterministicListeners bool
7779
DialAddressMappings []DialAddressMapping
7880
DisableDynamicListeners bool
7981
DynamicAdvertisedListener string

proxy/processor_default_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package proxy
33
import (
44
"bytes"
55
"encoding/hex"
6+
"testing"
7+
"time"
8+
69
"github.com/grepplabs/kafka-proxy/proxy/protocol"
710
"github.com/pkg/errors"
811
"github.com/stretchr/testify/assert"
9-
"testing"
10-
"time"
1112
)
1213

1314
func TestHandleRequest(t *testing.T) {
@@ -130,7 +131,7 @@ func TestHandleRequest(t *testing.T) {
130131
}
131132

132133
func TestHandleResponse(t *testing.T) {
133-
netAddressMappingFunc := func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
134+
netAddressMappingFunc := func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
134135
if brokerHost == "localhost" {
135136
switch brokerPort {
136137
case 19092:

proxy/protocol/responses.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const (
1414
brokersKeyName = "brokers"
1515
hostKeyName = "host"
1616
portKeyName = "port"
17+
nodeKeyName = "node_id"
1718

1819
coordinatorKeyName = "coordinator"
1920
coordinatorsKeyName = "coordinators"
@@ -26,7 +27,7 @@ var (
2627

2728
func createMetadataResponseSchemaVersions() []Schema {
2829
metadataBrokerV0 := NewSchema("metadata_broker_v0",
29-
&Mfield{Name: "node_id", Ty: TypeInt32},
30+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
3031
&Mfield{Name: hostKeyName, Ty: TypeStr},
3132
&Mfield{Name: portKeyName, Ty: TypeInt32},
3233
)
@@ -51,14 +52,14 @@ func createMetadataResponseSchemaVersions() []Schema {
5152
)
5253

5354
metadataBrokerV1 := NewSchema("metadata_broker_v1",
54-
&Mfield{Name: "node_id", Ty: TypeInt32},
55+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
5556
&Mfield{Name: hostKeyName, Ty: TypeStr},
5657
&Mfield{Name: portKeyName, Ty: TypeInt32},
5758
&Mfield{Name: "rack", Ty: TypeNullableStr},
5859
)
5960

6061
metadataBrokerSchema9 := NewSchema("metadata_broker_schema9",
61-
&Mfield{Name: "node_id", Ty: TypeInt32},
62+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
6263
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
6364
&Mfield{Name: portKeyName, Ty: TypeInt32},
6465
&Mfield{Name: "rack", Ty: TypeCompactNullableStr},
@@ -248,13 +249,13 @@ func createMetadataResponseSchemaVersions() []Schema {
248249

249250
func createFindCoordinatorResponseSchemaVersions() []Schema {
250251
findCoordinatorBrokerV0 := NewSchema("find_coordinator_broker_v0",
251-
&Mfield{Name: "node_id", Ty: TypeInt32},
252+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
252253
&Mfield{Name: hostKeyName, Ty: TypeStr},
253254
&Mfield{Name: portKeyName, Ty: TypeInt32},
254255
)
255256

256257
findCoordinatorBrokerSchema9 := NewSchema("find_coordinator_broker_schema9",
257-
&Mfield{Name: "node_id", Ty: TypeInt32},
258+
&Mfield{Name: nodeKeyName, Ty: TypeInt32},
258259
&Mfield{Name: hostKeyName, Ty: TypeCompactStr},
259260
&Mfield{Name: portKeyName, Ty: TypeInt32},
260261
)
@@ -320,12 +321,16 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
320321
if !ok {
321322
return errors.New("broker.port not found")
322323
}
324+
nodeId, ok := broker.Get(nodeKeyName).(int32)
325+
if !ok {
326+
return errors.New("broker.node_id not found")
327+
}
323328

324329
if host == "" && port <= 0 {
325330
continue
326331
}
327332

328-
newHost, newPort, err := fn(host, port)
333+
newHost, newPort, err := fn(host, port, nodeId)
329334
if err != nil {
330335
return err
331336
}
@@ -336,7 +341,7 @@ func modifyMetadataResponse(decodedStruct *Struct, fn config.NetAddressMappingFu
336341
}
337342
}
338343
if port != newPort {
339-
err = broker.Replace(portKeyName, int32(newPort))
344+
err = broker.Replace(portKeyName, newPort)
340345
if err != nil {
341346
return err
342347
}
@@ -383,12 +388,16 @@ func modifyCoordinator(decodedStruct *Struct, fn config.NetAddressMappingFunc) e
383388
if !ok {
384389
return errors.New("coordinator.port not found")
385390
}
391+
nodeId, ok := coordinator.Get(nodeKeyName).(int32)
392+
if !ok {
393+
return errors.New("coordinator.node_id not found")
394+
}
386395

387396
if host == "" && port <= 0 {
388397
return nil
389398
}
390399

391-
newHost, newPort, err := fn(host, port)
400+
newHost, newPort, err := fn(host, port, nodeId)
392401
if err != nil {
393402
return err
394403
}

proxy/protocol/responses_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package protocol
33
import (
44
"encoding/hex"
55
"fmt"
6-
"github.com/google/uuid"
76
"reflect"
87
"strings"
98
"testing"
109

10+
"github.com/google/uuid"
11+
1112
"github.com/grepplabs/kafka-proxy/config"
1213
"github.com/pkg/errors"
1314
"github.com/stretchr/testify/assert"
@@ -20,7 +21,7 @@ var (
2021
// topic_metadata
2122
0x00, 0x00, 0x00, 0x00}
2223

23-
testResponseModifier = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
24+
testResponseModifier = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
2425
if brokerHost == "localhost" && brokerPort == 51 {
2526
return "myhost1", 34001, nil
2627
} else if brokerHost == "google.com" && brokerPort == 273 {
@@ -31,7 +32,7 @@ var (
3132
return "", 0, errors.New("unexpected data")
3233
}
3334

34-
testResponseModifier2 = func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
35+
testResponseModifier2 = func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
3536
if brokerHost == "localhost" && brokerPort == 19092 {
3637
return "myhost1", 34001, nil
3738
} else if brokerHost == "localhost" && brokerPort == 29092 {
@@ -374,7 +375,7 @@ func TestMetadataResponseV0(t *testing.T) {
374375
a.Nil(err)
375376
a.Equal(bytes, resp)
376377

377-
modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) {
378+
modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, func(brokerHost string, brokerPort int32, brokerId int32) (listenerHost string, listenerPort int32, err error) {
378379
if brokerHost == "localhost" && brokerPort == 51 {
379380
return "azure.microsoft.com", 34001, nil
380381
} else if brokerHost == "google.com" && brokerPort == 273 {

0 commit comments

Comments
 (0)