Skip to content

Commit c227df5

Browse files
committed
Add Metadata Response (Version: 8)
1 parent 1df00fa commit c227df5

File tree

3 files changed

+290
-3
lines changed

3 files changed

+290
-3
lines changed

README.md

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -489,8 +489,8 @@ Use localhost:32400, localhost:32401 and localhost:32402 as bootstrap servers
489489

490490

491491
### Connect to Kafka running in Kubernetes example (kafka proxy runs locally)
492-
493-
kafka.properties of one node Kafka
492+
#### one node Kafka cluster
493+
kafka.properties
494494

495495
```
496496
broker.id=0
@@ -508,6 +508,63 @@ kafka-proxy server --bootstrap-server-mapping "127.0.0.1:9092,0.0.0.0:19092" --d
508508

509509
Use localhost:19092 as bootstrap servers
510510

511+
#### 3 nodes Kafka cluster
512+
513+
[strimzi 0.13.0 CRD](https://strimzi.io/)
514+
515+
```yaml
516+
apiVersion: kafka.strimzi.io/v1beta1
517+
kind: Kafka
518+
metadata:
519+
name: test-cluster
520+
namespace: kafka
521+
spec:
522+
kafka:
523+
version: 2.3.0
524+
replicas: 3
525+
listeners:
526+
plain: {}
527+
tls: {}
528+
config:
529+
offsets.topic.replication.factor: 3
530+
transaction.state.log.replication.factor: 3
531+
transaction.state.log.min.isr: 2
532+
num.partitions: 60
533+
default.replication.factor: 3
534+
storage:
535+
type: jbod
536+
volumes:
537+
- id: 0
538+
type: persistent-claim
539+
size: 20Gi
540+
deleteClaim: true
541+
zookeeper:
542+
replicas: 3
543+
storage:
544+
type: persistent-claim
545+
size: 5Gi
546+
deleteClaim: true
547+
entityOperator:
548+
topicOperator: {}
549+
userOperator: {}
550+
```
551+
552+
```bash
553+
kubectl port-forward -n kafka test-cluster-kafka-0 9092:9092
554+
kubectl port-forward -n kafka test-cluster-kafka-1 9093:9092
555+
kubectl port-forward -n kafka test-cluster-kafka-2 9094:9092
556+
557+
kafka-proxy server --log-level debug \
558+
--bootstrap-server-mapping "127.0.0.1:9092,0.0.0.0:19092" \
559+
--bootstrap-server-mapping "127.0.0.1:9093,0.0.0.0:19093" \
560+
--bootstrap-server-mapping "127.0.0.1:9094,0.0.0.0:19094" \
561+
--dial-address-mapping "test-cluster-kafka-0.test-cluster-kafka-brokers.kafka.svc.cluster.local:9092,0.0.0.0:9092" \
562+
--dial-address-mapping "test-cluster-kafka-1.test-cluster-kafka-brokers.kafka.svc.cluster.local:9092,0.0.0.0:9093" \
563+
--dial-address-mapping "test-cluster-kafka-2.test-cluster-kafka-brokers.kafka.svc.cluster.local:9092,0.0.0.0:9094"
564+
```
565+
566+
Use localhost:19092 as bootstrap servers
567+
511568
### Embedded third-party source code
512569

513570
* [Cloud SQL Proxy](https://github.com/GoogleCloudPlatform/cloudsql-proxy)

proxy/protocol/responses.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ func createMetadataResponseSchemaVersions() []Schema {
9797
&array{name: "partition_metadata", ty: partitionMetadataV7},
9898
)
9999

100+
topicMetadataV8 := NewSchema("topic_metadata_v8",
101+
&field{name: "error_code", ty: typeInt16},
102+
&field{name: "name", ty: typeStr},
103+
&field{name: "is_internal", ty: typeBool},
104+
&array{name: "partition_metadata", ty: partitionMetadataV7},
105+
&field{name: "topic_authorized_operations", ty: typeInt32},
106+
)
107+
100108
metadataResponseV1 := NewSchema("metadata_response_v1",
101109
&array{name: brokersKeyName, ty: metadataBrokerV1},
102110
&field{name: "controller_id", ty: typeInt32},
@@ -138,7 +146,16 @@ func createMetadataResponseSchemaVersions() []Schema {
138146
&array{name: "topic_metadata", ty: topicMetadataV7},
139147
)
140148

141-
return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7}
149+
metadataResponseV8 := NewSchema("metadata_response_v8",
150+
&field{name: "throttle_time_ms", ty: typeInt32},
151+
&array{name: brokersKeyName, ty: metadataBrokerV1},
152+
&field{name: "cluster_id", ty: typeNullableStr},
153+
&field{name: "controller_id", ty: typeInt32},
154+
&array{name: "topic_metadata", ty: topicMetadataV8},
155+
&field{name: "cluster_authorized_operations", ty: typeInt32},
156+
)
157+
158+
return []Schema{metadataResponseV0, metadataResponseV1, metadataResponseV2, metadataResponseV3, metadataResponseV4, metadataResponseV5, metadataResponseV6, metadataResponseV7, metadataResponseV8}
142159
}
143160

144161
func createFindCoordinatorResponseSchemaVersions() []Schema {

proxy/protocol/responses_test.go

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1718,6 +1718,219 @@ func TestMetadataResponseV7(t *testing.T) {
17181718
}
17191719
a.Equal(expected, dc.AttrValues())
17201720
}
1721+
1722+
func TestMetadataResponseV8(t *testing.T) {
1723+
/*
1724+
Metadata Response (Version: 8) => throttle_time_ms [brokers] cluster_id controller_id [topics] cluster_authorized_operations
1725+
throttle_time_ms => INT32
1726+
brokers => node_id host port rack
1727+
node_id => INT32
1728+
host => STRING
1729+
port => INT32
1730+
rack => NULLABLE_STRING
1731+
cluster_id => NULLABLE_STRING
1732+
controller_id => INT32
1733+
topics => error_code name is_internal [partitions] topic_authorized_operations
1734+
error_code => INT16
1735+
name => STRING
1736+
is_internal => BOOLEAN
1737+
partitions => error_code partition_index leader_id leader_epoch [replica_nodes] [isr_nodes] [offline_replicas]
1738+
error_code => INT16
1739+
partition_index => INT32
1740+
leader_id => INT32
1741+
leader_epoch => INT32
1742+
replica_nodes => INT32
1743+
isr_nodes => INT32
1744+
offline_replicas => INT32
1745+
topic_authorized_operations => INT32
1746+
cluster_authorized_operations => INT32
1747+
*/
1748+
1749+
apiVersion := int16(8)
1750+
1751+
bytes := []byte{
1752+
// throttle_time_ms
1753+
0x00, 0x00, 0x00, 0x01, // 1
1754+
// brokers
1755+
0x00, 0x00, 0x00, 0x03,
1756+
// brokers[0]
1757+
0x00, 0x00, 0xab, 0xff, // 44031
1758+
0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
1759+
0x00, 0x00, 0x00, 0x33, // 51
1760+
0x00, 0x00, // ''
1761+
// brokers[1]
1762+
0x00, 0x01, 0x02, 0x03, // 66051
1763+
0x00, 0x0a, 'g', 'o', 'o', 'g', 'l', 'e', '.', 'c', 'o', 'm',
1764+
0x00, 0x00, 0x01, 0x11, // 273
1765+
0x00, 0x07, 'e', 'u', 'w', 'e', 's', 't', '1',
1766+
// brokers[2]
1767+
0x00, 0x00, 0x00, 0x02, // 2
1768+
0x00, 0x09, 'k', 'a', 'f', 'k', 'a', '.', 'o', 'r', 'g',
1769+
0x00, 0x00, 0xd0, 0xff, // 53503
1770+
0xff, 0xff, // -1 is nil'
1771+
1772+
// cluster_id
1773+
0x00, 0x07, 'm', 'y', 'k', 'a', 'f', 'k', 'a',
1774+
1775+
// controller_id
1776+
0x00, 0x00, 0xe1, 0xb2, // 57778
1777+
1778+
// topic_metadata
1779+
0x00, 0x00, 0x00, 0x02,
1780+
1781+
// topic_metadata[0]
1782+
0x00, 0x00,
1783+
0x00, 0x03, 'f', 'o', 'o',
1784+
0x01, // true
1785+
// partition_metadata
1786+
0x00, 0x00, 0x00, 0x01,
1787+
0x00, 0x04,
1788+
0x00, 0x00, 0x00, 0x01,
1789+
0x00, 0x00, 0x00, 0x07,
1790+
0x00, 0x00, 0x00, 0x08,
1791+
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03,
1792+
0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x02,
1793+
0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x05, 0x00, 0x00, 0x00, 0x06, 0x00, 0x00, 0x00, 0x07,
1794+
0x00, 0x00, 0x00, 0x08, // topic_authorized_operations 8
1795+
// topic_metadata[1]
1796+
0x00, 0x00,
1797+
0x00, 0x03, 'b', 'a', 'r',
1798+
0x00, // false
1799+
// partition_metadata
1800+
0x00, 0x00, 0x00, 0x00,
1801+
0x00, 0x00, 0x00, 0x04, // topic_authorized_operations 4
1802+
// cluster_authorized_operations 5
1803+
0x00, 0x00, 0x00, 0x05,
1804+
}
1805+
1806+
a := assert.New(t)
1807+
1808+
schema := metadataResponseSchemaVersions[apiVersion]
1809+
1810+
s, err := DecodeSchema(bytes, schema)
1811+
a.Nil(err)
1812+
1813+
dc := NewDecodeCheck()
1814+
dc.Traverse(s)
1815+
1816+
expected := []string{
1817+
"throttle_time_ms int32 1",
1818+
"[brokers]",
1819+
"brokers struct",
1820+
"node_id int32 44031",
1821+
"host string localhost",
1822+
"port int32 51",
1823+
"rack *string ",
1824+
"brokers struct",
1825+
"node_id int32 66051",
1826+
"host string google.com",
1827+
"port int32 273",
1828+
"rack *string euwest1",
1829+
"brokers struct",
1830+
"node_id int32 2",
1831+
"host string kafka.org",
1832+
"port int32 53503",
1833+
"rack *string <nil>",
1834+
"cluster_id *string mykafka",
1835+
"controller_id int32 57778",
1836+
"[topic_metadata]",
1837+
"topic_metadata struct",
1838+
"error_code int16 0",
1839+
"name string foo",
1840+
"is_internal bool true",
1841+
"[partition_metadata]",
1842+
"partition_metadata struct",
1843+
"error_code int16 4",
1844+
"partition int32 1",
1845+
"leader int32 7",
1846+
"leader_epoch int32 8",
1847+
"[replicas]",
1848+
"replicas int32 1",
1849+
"replicas int32 2",
1850+
"replicas int32 3",
1851+
"[isr]",
1852+
"isr int32 3",
1853+
"isr int32 2",
1854+
"[offline_replicas]",
1855+
"offline_replicas int32 5",
1856+
"offline_replicas int32 6",
1857+
"offline_replicas int32 7",
1858+
"topic_authorized_operations int32 8",
1859+
"topic_metadata struct",
1860+
"error_code int16 0",
1861+
"name string bar",
1862+
"is_internal bool false",
1863+
"[partition_metadata]",
1864+
"topic_authorized_operations int32 4",
1865+
"cluster_authorized_operations int32 5",
1866+
}
1867+
a.Equal(expected, dc.AttrValues())
1868+
resp, err := EncodeSchema(s, schema)
1869+
a.Nil(err)
1870+
a.Equal(bytes, resp)
1871+
1872+
modifier, err := GetResponseModifier(apiKeyMetadata, apiVersion, testResponseModifier)
1873+
a.Nil(err)
1874+
resp, err = modifier.Apply(resp)
1875+
a.Nil(err)
1876+
s, err = DecodeSchema(resp, schema)
1877+
a.Nil(err)
1878+
dc = NewDecodeCheck()
1879+
dc.Traverse(s)
1880+
expected = []string{
1881+
"throttle_time_ms int32 1",
1882+
"[brokers]",
1883+
"brokers struct",
1884+
"node_id int32 44031",
1885+
"host string myhost1", // replaced
1886+
"port int32 34001", // replaced
1887+
"rack *string ",
1888+
"brokers struct",
1889+
"node_id int32 66051",
1890+
"host string myhost2", // replaced
1891+
"port int32 34002", // replaced
1892+
"rack *string euwest1",
1893+
"brokers struct",
1894+
"node_id int32 2",
1895+
"host string myhost3", // replaced
1896+
"port int32 34003", // replaced
1897+
"rack *string <nil>",
1898+
"cluster_id *string mykafka",
1899+
"controller_id int32 57778",
1900+
"[topic_metadata]",
1901+
"topic_metadata struct",
1902+
"error_code int16 0",
1903+
"name string foo",
1904+
"is_internal bool true",
1905+
"[partition_metadata]",
1906+
"partition_metadata struct",
1907+
"error_code int16 4",
1908+
"partition int32 1",
1909+
"leader int32 7",
1910+
"leader_epoch int32 8",
1911+
"[replicas]",
1912+
"replicas int32 1",
1913+
"replicas int32 2",
1914+
"replicas int32 3",
1915+
"[isr]",
1916+
"isr int32 3",
1917+
"isr int32 2",
1918+
"[offline_replicas]",
1919+
"offline_replicas int32 5",
1920+
"offline_replicas int32 6",
1921+
"offline_replicas int32 7",
1922+
"topic_authorized_operations int32 8",
1923+
"topic_metadata struct",
1924+
"error_code int16 0",
1925+
"name string bar",
1926+
"is_internal bool false",
1927+
"[partition_metadata]",
1928+
"topic_authorized_operations int32 4",
1929+
"cluster_authorized_operations int32 5",
1930+
1931+
}
1932+
a.Equal(expected, dc.AttrValues())
1933+
}
17211934
func TestFindCoordinatorResponseV0(t *testing.T) {
17221935
/*
17231936
FindCoordinator Response (Version: 0) => error_code coordinator

0 commit comments

Comments
 (0)