Skip to content

Commit 345c16e

Browse files
committed
Implement dial address mapping
1 parent 1f64067 commit 345c16e

File tree

7 files changed

+133
-18
lines changed

7 files changed

+133
-18
lines changed

README.md

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ See:
8383
--debug-enable Enable Debug endpoint
8484
--debug-listen-address string Debug listen address (default "0.0.0.0:6060")
8585
--default-listener-ip string Default listener IP (default "127.0.0.1")
86+
--dial-address-mapping stringArray Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment
8687
--dynamic-listeners-disable Disable dynamic listeners.
8788
--external-server-mapping stringArray Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started
8889
--forbidden-api-keys intSlice Forbidden Kafka request types. The restriction should prevent some Kafka operations e.g. 20 - DeleteTopics
@@ -378,7 +379,7 @@ spec:
378379
secretName: tls-client-key-file
379380
```
380381

381-
### Connect to Kafka running in Kubernetes example
382+
### Connect to Kafka running in Kubernetes example (kafka proxy runs in cluster)
382383

383384
```yaml
384385

@@ -484,9 +485,29 @@ spec:
484485
kubectl port-forward kafka-proxy-0 32400:32400 32401:32401 32402:32402
485486
```
486487

487-
Use localhost:32400, localhost:32401 and localhost:32402 as boostrap servers
488+
Use localhost:32400, localhost:32401 and localhost:32402 as bootstrap servers
488489

489490

491+
### Connect to Kafka running in Kubernetes example (kafka proxy runs locally)
492+
493+
kafka.properties of one node Kafka
494+
495+
```
496+
broker.id=0
497+
advertised.listeners=PLAINTEXT://kafka-0.kafka-headless.kafka:9092
498+
...
499+
```
500+
501+
```bash
502+
kubectl port-forward -n kafka kafka-0 9092:9092
503+
```
504+
505+
```bash
506+
kafka-proxy server --bootstrap-server-mapping "127.0.0.1:9092,0.0.0.0:19092" --dial-address-mapping "kafka-0.kafka-headless.kafka:9092,0.0.0.0:9092"
507+
```
508+
509+
Use localhost:19092 as bootstrap servers
510+
490511
### Embedded third-party source code
491512

492513
* [Cloud SQL Proxy](https://github.com/GoogleCloudPlatform/cloudsql-proxy)

cmd/kafka-proxy/server.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ var (
4040

4141
bootstrapServersMapping = make([]string, 0)
4242
externalServersMapping = make([]string, 0)
43+
dialAddressMapping = make([]string, 0)
4344
)
4445

4546
var Server = &cobra.Command{
@@ -57,6 +58,9 @@ var Server = &cobra.Command{
5758
if err := c.InitExternalServers(getOrEnvStringSlice(externalServersMapping, "EXTERNAL_SERVER_MAPPING")); err != nil {
5859
return err
5960
}
61+
if err := c.InitDialAddressMappings(getOrEnvStringSlice(dialAddressMapping, "DIAL_ADDRESS_MAPPING")); err != nil {
62+
return err
63+
}
6064
if err := c.Validate(); err != nil {
6165
return err
6266
}
@@ -81,6 +85,7 @@ func initFlags() {
8185
Server.Flags().StringVar(&c.Proxy.DefaultListenerIP, "default-listener-ip", "127.0.0.1", "Default listener IP")
8286
Server.Flags().StringArrayVar(&bootstrapServersMapping, "bootstrap-server-mapping", []string{}, "Mapping of Kafka bootstrap server address to local address (host:port,host:port(,advhost:advport))")
8387
Server.Flags().StringArrayVar(&externalServersMapping, "external-server-mapping", []string{}, "Mapping of Kafka server address to external address (host:port,host:port). A listener for the external address is not started")
88+
Server.Flags().StringArrayVar(&dialAddressMapping, "dial-address-mapping", []string{}, "Mapping of target broker address to new one (host:port,host:port). The mapping is performed during connection establishment")
8489
Server.Flags().BoolVar(&c.Proxy.DisableDynamicListeners, "dynamic-listeners-disable", false, "Disable dynamic listeners.")
8590

8691
Server.Flags().IntVar(&c.Proxy.RequestBufferSize, "proxy-request-buffer-size", 4096, "Request buffer size pro tcp connection")
@@ -173,7 +178,6 @@ func initFlags() {
173178
Server.Flags().StringVar(&c.Log.Level, "log-level", "info", "Log level debug, info, warning, error, fatal or panic")
174179
Server.Flags().StringVar(&c.Log.LevelFieldName, "log-level-fieldname", "@level", "Log level fieldname for json format")
175180

176-
177181
// Connect through Socks5 or HTTP CONNECT to Kafka
178182
Server.Flags().StringVar(&c.ForwardProxy.Url, "forward-proxy", "", "URL of the forward proxy. Supported schemas are socks5 and http")
179183

cmd/kafka-proxy/server_test.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ func setupBootstrapServersMappingTest() {
1111
Server.ResetFlags()
1212
c = new(config.Config)
1313
initFlags()
14-
os.Setenv("BOOTSTRAP_SERVER_MAPPING", "")
15-
os.Setenv("EXTERNAL_SERVER_MAPPING", "")
14+
_ = os.Setenv("BOOTSTRAP_SERVER_MAPPING", "")
15+
_ = os.Setenv("EXTERNAL_SERVER_MAPPING", "")
16+
_ = os.Setenv("DIAL_ADDRESS_MAPPING", "")
1617
}
1718

1819
func TestBootstrapServersMappingFromFlags(t *testing.T) {
@@ -24,7 +25,7 @@ func TestBootstrapServersMappingFromFlags(t *testing.T) {
2425
"--bootstrap-server-mapping", "kafka-2.example.com:9092,0.0.0.0:32403,kafka-2.grepplabs.com:9092",
2526
}
2627

27-
Server.ParseFlags(args)
28+
_ = Server.ParseFlags(args)
2829
err := Server.PreRunE(nil, args)
2930
a := assert.New(t)
3031
a.Nil(err)
@@ -44,13 +45,34 @@ func TestBootstrapServersMappingFromFlags(t *testing.T) {
4445

4546
}
4647

48+
func TestDialMappingFromFlags(t *testing.T) {
49+
setupBootstrapServersMappingTest()
50+
51+
args := []string{"cobra.test",
52+
"--bootstrap-server-mapping", "192.168.99.100:32401,0.0.0.0:32401",
53+
"--dial-address-mapping", "service-kafka-0.service-kafka-headless.service:9092,0.0.0.0:19092",
54+
"--dial-address-mapping", "192.168.99.100:32402,0.0.0.0:32402",
55+
}
56+
57+
_ = Server.ParseFlags(args)
58+
err := Server.PreRunE(nil, args)
59+
a := assert.New(t)
60+
a.Nil(err)
61+
a.Len(c.Proxy.DialAddressMappings, 2)
62+
63+
a.Equal(c.Proxy.DialAddressMappings[0].SourceAddress, "service-kafka-0.service-kafka-headless.service:9092")
64+
a.Equal(c.Proxy.DialAddressMappings[0].DestinationAddress, "0.0.0.0:19092")
65+
66+
a.Equal(c.Proxy.DialAddressMappings[1].SourceAddress, "192.168.99.100:32402")
67+
a.Equal(c.Proxy.DialAddressMappings[1].DestinationAddress, "0.0.0.0:32402")
68+
}
4769
func TestBootstrapServersMappingFromEnv(t *testing.T) {
4870
setupBootstrapServersMappingTest()
4971

50-
os.Setenv("BOOTSTRAP_SERVER_MAPPING", "192.168.99.100:32404,0.0.0.0:32404 kafka-5.example.com:9092,0.0.0.0:32405,kafka-5.grepplabs.com:9092")
72+
_ = os.Setenv("BOOTSTRAP_SERVER_MAPPING", "192.168.99.100:32404,0.0.0.0:32404 kafka-5.example.com:9092,0.0.0.0:32405,kafka-5.grepplabs.com:9092")
5173

5274
var args []string
53-
Server.ParseFlags(args)
75+
_ = Server.ParseFlags(args)
5476
err := Server.PreRunE(nil, args)
5577
a := assert.New(t)
5678
a.Nil(err)
@@ -70,7 +92,7 @@ func TestEmptyBootstrapServersMapping(t *testing.T) {
7092
setupBootstrapServersMappingTest()
7193

7294
var args []string
73-
Server.ParseFlags(args)
95+
_ = Server.ParseFlags(args)
7496
err := Server.PreRunE(nil, args)
7597
a := assert.New(t)
7698
a.Error(err, "list of bootstrap-server-mapping must not be empty")
@@ -79,10 +101,10 @@ func TestEmptyBootstrapServersMapping(t *testing.T) {
79101
func TestBootstrapServersMappingFromEnvWithWhiteSpaces(t *testing.T) {
80102
setupBootstrapServersMappingTest()
81103

82-
os.Setenv("BOOTSTRAP_SERVER_MAPPING", " 192.168.99.100:32404,0.0.0.0:32404 kafka-5.example.com:9092,0.0.0.0:32405,kafka-5.grepplabs.com:9092 ")
104+
_ = os.Setenv("BOOTSTRAP_SERVER_MAPPING", " 192.168.99.100:32404,0.0.0.0:32404 kafka-5.example.com:9092,0.0.0.0:32405,kafka-5.grepplabs.com:9092 ")
83105

84106
var args []string
85-
Server.ParseFlags(args)
107+
_ = Server.ParseFlags(args)
86108
err := Server.PreRunE(nil, args)
87109
a := assert.New(t)
88110
a.Nil(err)
@@ -101,11 +123,11 @@ func TestBootstrapServersMappingFromEnvWithWhiteSpaces(t *testing.T) {
101123
func TestExternalServersMappingFromEnv(t *testing.T) {
102124
setupBootstrapServersMappingTest()
103125

104-
os.Setenv("BOOTSTRAP_SERVER_MAPPING", " 192.168.99.100:32401,0.0.0.0:32401")
105-
os.Setenv("EXTERNAL_SERVER_MAPPING", " 192.168.99.100:32404,0.0.0.0:32404 kafka-5.example.com:9092,0.0.0.0:32405,kafka-5.grepplabs.com:9092")
126+
_ = os.Setenv("BOOTSTRAP_SERVER_MAPPING", " 192.168.99.100:32401,0.0.0.0:32401")
127+
_ = os.Setenv("EXTERNAL_SERVER_MAPPING", " 192.168.99.100:32404,0.0.0.0:32404 kafka-5.example.com:9092,0.0.0.0:32405,kafka-5.grepplabs.com:9092")
106128

107129
var args []string
108-
Server.ParseFlags(args)
130+
_ = Server.ParseFlags(args)
109131
err := Server.PreRunE(nil, args)
110132
a := assert.New(t)
111133
a.Nil(err)

config/config.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ type ListenerConfig struct {
2424
ListenerAddress string
2525
AdvertisedAddress string
2626
}
27+
type DialAddressMapping struct {
28+
SourceAddress string
29+
DestinationAddress string
30+
}
2731

2832
type Config struct {
2933
Http struct {
@@ -46,6 +50,7 @@ type Config struct {
4650
DefaultListenerIP string
4751
BootstrapServers []ListenerConfig
4852
ExternalServers []ListenerConfig
53+
DialAddressMappings []DialAddressMapping
4954
DisableDynamicListeners bool
5055
RequestBufferSize int
5156
ResponseBufferSize int
@@ -146,11 +151,17 @@ func (c *Config) InitBootstrapServers(bootstrapServersMapping []string) (err err
146151
c.Proxy.BootstrapServers, err = getListenerConfigs(bootstrapServersMapping)
147152
return err
148153
}
154+
149155
func (c *Config) InitExternalServers(externalServersMapping []string) (err error) {
150156
c.Proxy.ExternalServers, err = getListenerConfigs(externalServersMapping)
151157
return err
152158
}
153159

160+
func (c *Config) InitDialAddressMappings(dialMappings []string) (err error) {
161+
c.Proxy.DialAddressMappings, err = getDialAddressMappings(dialMappings)
162+
return err
163+
}
164+
154165
func (c *Config) InitSASLCredentials() (err error) {
155166
if c.Kafka.SASL.JaasConfigFile != "" {
156167
credentials, err := NewJaasCredentialFromFile(c.Kafka.SASL.JaasConfigFile)
@@ -162,6 +173,30 @@ func (c *Config) InitSASLCredentials() (err error) {
162173
}
163174
return nil
164175
}
176+
func getDialAddressMappings(dialMapping []string) ([]DialAddressMapping, error) {
177+
dialMappings := make([]DialAddressMapping, 0)
178+
if dialMapping != nil {
179+
for _, v := range dialMapping {
180+
pair := strings.Split(v, ",")
181+
if len(pair) != 2 {
182+
return nil, errors.New("dial-mapping must be in form 'srchost:srcport,dsthost:dstport'")
183+
}
184+
srcHost, srcPort, err := util.SplitHostPort(pair[0])
185+
if err != nil {
186+
return nil, err
187+
}
188+
dstHost, dstPort, err := util.SplitHostPort(pair[1])
189+
if err != nil {
190+
return nil, err
191+
}
192+
dialMapping := DialAddressMapping{
193+
SourceAddress: net.JoinHostPort(srcHost, fmt.Sprint(srcPort)),
194+
DestinationAddress: net.JoinHostPort(dstHost, fmt.Sprint(dstPort))}
195+
dialMappings = append(dialMappings, dialMapping)
196+
}
197+
}
198+
return dialMappings, nil
199+
}
165200

166201
func getListenerConfigs(serversMapping []string) ([]ListenerConfig, error) {
167202
listenerConfigs := make([]ListenerConfig, 0)

proxy/client.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package proxy
22

33
import (
44
"crypto/tls"
5+
"fmt"
56
"github.com/grepplabs/kafka-proxy/config"
67
"github.com/grepplabs/kafka-proxy/pkg/apis"
78
"github.com/pkg/errors"
@@ -36,6 +37,8 @@ type Client struct {
3637

3738
saslAuthByProxy SASLAuthByProxy
3839
authClient *AuthClient
40+
41+
dialAddressMapping map[string]config.DialAddressMapping
3942
}
4043

4144
func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.NetAddressMappingFunc, localPasswordAuthenticator apis.PasswordAuthenticator, localTokenAuthenticator apis.TokenInfo, saslTokenProvider apis.TokenProvider, gatewayTokenProvider apis.TokenProvider, gatewayTokenInfo apis.TokenInfo) (*Client, error) {
@@ -105,6 +108,10 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
105108
return nil, errors.Errorf("SASL Mechanism not valid '%s'", c.Kafka.SASL.Method)
106109
}
107110
}
111+
dialAddressMapping, err := getAddressToDialAddressMapping(c)
112+
if err != nil {
113+
return nil, err
114+
}
108115

109116
return &Client{conns: conns, config: c, dialer: dialer, tcpConnOptions: tcpConnOptions, stopRun: make(chan struct{}, 1),
110117
saslAuthByProxy: saslAuthByProxy,
@@ -136,7 +143,25 @@ func NewClient(conns *ConnSet, c *config.Config, netAddressMappingFunc config.Ne
136143
tokenInfo: gatewayTokenInfo,
137144
},
138145
ForbiddenApiKeys: forbiddenApiKeys,
139-
}}, nil
146+
},
147+
dialAddressMapping: dialAddressMapping,
148+
}, nil
149+
}
150+
151+
func getAddressToDialAddressMapping(cfg *config.Config) (map[string]config.DialAddressMapping, error) {
152+
addressToDialAddressMapping := make(map[string]config.DialAddressMapping)
153+
154+
for _, v := range cfg.Proxy.DialAddressMappings {
155+
if lc, ok := addressToDialAddressMapping[v.SourceAddress]; ok {
156+
if lc.SourceAddress != v.SourceAddress || lc.DestinationAddress != v.DestinationAddress {
157+
return nil, fmt.Errorf("dial address mapping %s configured twice: %v and %v", v.SourceAddress, v, lc)
158+
}
159+
continue
160+
}
161+
logrus.Infof("Dial address mapping src %s dst %s", v.SourceAddress, v.DestinationAddress)
162+
addressToDialAddressMapping[v.SourceAddress] = v
163+
}
164+
return addressToDialAddressMapping, nil
140165
}
141166

142167
func newDialer(c *config.Config, tlsConfig *tls.Config) (Dialer, error) {
@@ -219,9 +244,15 @@ func (c *Client) Close() {
219244
func (c *Client) handleConn(conn Conn) {
220245
proxyConnectionsTotal.WithLabelValues(conn.BrokerAddress).Inc()
221246

222-
server, err := c.DialAndAuth(conn.BrokerAddress)
247+
dialAddress := conn.BrokerAddress
248+
if addressMapping, ok := c.dialAddressMapping[dialAddress]; ok {
249+
dialAddress = addressMapping.DestinationAddress
250+
logrus.Infof("Dial address changed from %s to %s", conn.BrokerAddress, dialAddress)
251+
}
252+
253+
server, err := c.DialAndAuth(dialAddress)
223254
if err != nil {
224-
logrus.Infof("couldn't connect to %s: %v", conn.BrokerAddress, err)
255+
logrus.Infof("couldn't connect to %s(%s): %v", dialAddress, conn.BrokerAddress, err)
225256
_ = conn.LocalConnection.Close()
226257
return
227258
}

proxy/proxy.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,11 @@ func (p *Listeners) GetNetAddressMapping(brokerHost string, brokerPort int32) (l
122122
p.lock.RUnlock()
123123

124124
if ok {
125+
logrus.Debugf("Address mappings broker=%s, listener=%s, advertised=%s", listenerConfig.BrokerAddress, listenerConfig.ListenerAddress, listenerConfig.AdvertisedAddress)
125126
return util.SplitHostPort(listenerConfig.AdvertisedAddress)
126127
}
127128
if !p.disableDynamicListeners {
129+
logrus.Infof("Starting dynamic listener for broker %s", brokerAddress)
128130
return p.ListenDynamicInstance(brokerAddress)
129131
}
130132
return "", 0, fmt.Errorf("net address mapping for %s:%d was not found", brokerHost, brokerPort)

proxy/sasl_scram.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (b *SASLSCRAMAuth) sendAndReceiveSASLAuth(conn DeadlineReaderWriter) error
9898

9999
msg, err = scramConversation.Step(string(challenge))
100100
if err != nil {
101-
logrus.Debugf("SASL authentication failed", err)
101+
logrus.Debugf("SASL authentication failed %s", err)
102102
//Logger.Println("SASL authentication failed", err)
103103
return err
104104
}

0 commit comments

Comments
 (0)