|
| 1 | +package proxy |
| 2 | + |
| 3 | +import ( |
| 4 | + "bytes" |
| 5 | + "encoding/hex" |
| 6 | + "github.com/grepplabs/kafka-proxy/proxy/protocol" |
| 7 | + "github.com/pkg/errors" |
| 8 | + "github.com/stretchr/testify/assert" |
| 9 | + "testing" |
| 10 | + "time" |
| 11 | +) |
| 12 | + |
| 13 | +func TestHandleResponse(t *testing.T) { |
| 14 | + netAddressMappingFunc := func(brokerHost string, brokerPort int32) (listenerHost string, listenerPort int32, err error) { |
| 15 | + if brokerHost == "localhost" { |
| 16 | + switch brokerPort { |
| 17 | + case 19092: |
| 18 | + return "0.0.0.0", 30001, nil |
| 19 | + case 29092: |
| 20 | + return "0.0.0.0", 30002, nil |
| 21 | + case 39092: |
| 22 | + return "0.0.0.0", 30003, nil |
| 23 | + } |
| 24 | + } |
| 25 | + return "", 0, errors.Errorf("unexpected broker %s:%d", brokerHost, brokerPort) |
| 26 | + } |
| 27 | + buf := make([]byte, defaultResponseBufferSize) |
| 28 | + tt := []struct { |
| 29 | + name string |
| 30 | + apiKey int16 |
| 31 | + apiVersion int16 |
| 32 | + hexInput string |
| 33 | + hexOutput string |
| 34 | + }{ |
| 35 | + {name: "Produce v5, kafka-client 1.1.1", apiKey: 0, apiVersion: 5, |
| 36 | + hexInput: "0000003f0000001000000001000f746573742d6e6f2d68656164657273000000010000000000000000000000000008ffffffffffffffff000000000000000000000000", |
| 37 | + }, |
| 38 | + {name: "Produce v6, kafka-client 2.0.1", apiKey: 0, apiVersion: 6, |
| 39 | + hexInput: "0000003f0000000e00000001000f746573742d6e6f2d68656164657273000000010000000000000000000000000077ffffffffffffffff000000000000000000000000", |
| 40 | + }, |
| 41 | + {name: "Produce v7, kafka-client 2.2.2", apiKey: 0, apiVersion: 7, |
| 42 | + hexInput: "0000003f0000000e00000001000f746573742d6e6f2d68656164657273000000010000000000000000000000000059ffffffffffffffff000000000000000000000000", |
| 43 | + }, |
| 44 | + {name: "Produce v8", apiKey: 0, apiVersion: 8, |
| 45 | + hexInput: "000000450000000300000001000f746573742d6e6f2d6865616465727300000001000000000000000000000000000affffffffffffffff000000000000000000000000ffff00000000", |
| 46 | + }, |
| 47 | + {name: "Fetch v11, kafka-client 2.3.1", apiKey: 1, apiVersion: 11, |
| 48 | + hexInput: "0000003d0000000200000000000000010011746f7069632d73746172742d6f6c642d3200000001000000000000ffffffffffffffff000000000000000000000000", |
| 49 | + }, |
| 50 | + {name: "ListOffsets v5, kafka-client 2.3.1", apiKey: 2, apiVersion: 5, |
| 51 | + hexInput: "0000003d0000000200000000000000010011746f7069632d73746172742d6f6c642d3200000001000000000000ffffffffffffffff000000000000000000000000", |
| 52 | + }, |
| 53 | + {name: "Metadata v5, kafka-client 1.1.1", apiKey: 3, apiVersion: 5, |
| 54 | + hexInput: "000000830000000300000000000000030000000200096c6f63616c686f7374000071a4ffff0000000300096c6f63616c686f7374000098b4ffff0000000100096c6f63616c686f737400004a94ffff001641765a42526d583151377567314972466861387a6b4100000001000000010005000f746573742d6e6f2d686561646572730000000000", |
| 55 | + hexOutput: "0000007d000000030000000000000003000000020007302e302e302e3000007532ffff000000030007302e302e302e3000007533ffff000000010007302e302e302e3000007531ffff001641765a42526d583151377567314972466861387a6b4100000001000000010005000f746573742d6e6f2d686561646572730000000000", |
| 56 | + }, |
| 57 | + {name: "Metadata v6, kafka-client 2.0.1", apiKey: 3, apiVersion: 6, |
| 58 | + hexInput: "000000a10000000300000000000000030000000200096c6f63616c686f7374000071a4ffff0000000300096c6f63616c686f7374000098b4ffff0000000100096c6f63616c686f737400004a94ffff001661675a5354684564533236587a3343566a6944614b5100000003000000010000000f746573742d6e6f2d686561646572730000000001000000000000000000030000000100000003000000010000000300000000", |
| 59 | + hexOutput: "0000009b000000030000000000000003000000020007302e302e302e3000007532ffff000000030007302e302e302e3000007533ffff000000010007302e302e302e3000007531ffff001661675a5354684564533236587a3343566a6944614b5100000003000000010000000f746573742d6e6f2d686561646572730000000001000000000000000000030000000100000003000000010000000300000000", |
| 60 | + }, |
| 61 | + {name: "Metadata v7, kafka-client 2.1.1", apiKey: 3, apiVersion: 7, |
| 62 | + hexInput: "000000a50000000200000000000000030000000200096c6f63616c686f7374000071a4ffff0000000300096c6f63616c686f7374000098b4ffff0000000100096c6f63616c686f737400004a94ffff001661675a5354684564533236587a3343566a6944614b5100000003000000010000000f746573742d6e6f2d68656164657273000000000100000000000000000003000000000000000100000003000000010000000300000000", |
| 63 | + hexOutput: "0000009f000000020000000000000003000000020007302e302e302e3000007532ffff000000030007302e302e302e3000007533ffff000000010007302e302e302e3000007531ffff001661675a5354684564533236587a3343566a6944614b5100000003000000010000000f746573742d6e6f2d68656164657273000000000100000000000000000003000000000000000100000003000000010000000300000000", |
| 64 | + }, |
| 65 | + {name: "Metadata v8, kafka-client 2.3.1", apiKey: 3, apiVersion: 8, |
| 66 | + hexInput: "000000ad0000000100000000000000030000000200096c6f63616c686f7374000071a4ffff0000000300096c6f63616c686f7374000098b4ffff0000000100096c6f63616c686f737400004a94ffff001661675a5354684564533236587a3343566a6944614b5100000003000000010000000f746573742d6e6f2d686561646572730000000001000000000000000000030000000000000001000000030000000100000003000000000000000000000000", |
| 67 | + hexOutput: "000000a7000000010000000000000003000000020007302e302e302e3000007532ffff000000030007302e302e302e3000007533ffff000000010007302e302e302e3000007531ffff001661675a5354684564533236587a3343566a6944614b5100000003000000010000000f746573742d6e6f2d686561646572730000000001000000000000000000030000000000000001000000030000000100000003000000000000000000000000", |
| 68 | + }, |
| 69 | + {name: "Metadata v9, kafka-client 2.4.1", apiKey: 3, apiVersion: 9, |
| 70 | + hexInput: "0000009a00000001000000000004000000020a6c6f63616c686f7374000071a40000000000030a6c6f63616c686f7374000098b40000000000010a6c6f63616c686f737400004a9400001761675a5354684564533236587a3343566a6944614b510000000302000010746573742d6e6f2d686561646572730002000000000000000000030000000002000000030200000003010000000000000000000000", |
| 71 | + hexOutput: "00000094000000010000000000040000000208302e302e302e300000753200000000000308302e302e302e300000753300000000000108302e302e302e300000753100001761675a5354684564533236587a3343566a6944614b510000000302000010746573742d6e6f2d686561646572730002000000000000000000030000000002000000030200000003010000000000000000000000", |
| 72 | + }, |
| 73 | + {name: "OffsetFetch v5, kafka-client 2.3.1", apiKey: 9, apiVersion: 5, |
| 74 | + hexInput: "000000390000000c00000000000000010011746f7069632d73746172742d6f6c642d3200000001000000000000000000000000ffffffff000000000000", |
| 75 | + }, |
| 76 | + {name: "OffsetFetch v6, kafka-client 2.4.1", apiKey: 9, apiVersion: 6, |
| 77 | + hexInput: "000000350000000c00000000000212746f7069632d73746172742d6f6c642d3202000000000000000000000000ffffffff0100000000000000", |
| 78 | + }, |
| 79 | + {name: "DescribeGroups v3, kafka-client 2.3.1", apiKey: 15, apiVersion: 3, |
| 80 | + hexInput: "000000400000000a0000000000000001000000154b61666b614578616d706c65436f6e73756d6572320005456d7074790008636f6e73756d657200000000000000000000", |
| 81 | + }, |
| 82 | + {name: "ListGroups v2, kafka-client 2.3.1", apiKey: 16, apiVersion: 2, |
| 83 | + hexInput: "0000002f000000040000000000000000000100154b61666b614578616d706c65436f6e73756d6572320008636f6e73756d6572", |
| 84 | + }, |
| 85 | + {name: "FindCoordinator v2, kafka-client 2.3.1", apiKey: 10, apiVersion: 2, |
| 86 | + hexInput: "000000230000000b00000000000000044e4f4e450000000200096c6f63616c686f7374000071a4", |
| 87 | + hexOutput: "000000210000000b00000000000000044e4f4e45000000020007302e302e302e3000007532", |
| 88 | + }, |
| 89 | + {name: "FindCoordinator v3, kafka-client 2.4.1", apiKey: 10, apiVersion: 3, |
| 90 | + hexInput: "000000230000000900000000000000054e4f4e45000000020a6c6f63616c686f7374000071a400", |
| 91 | + hexOutput: "000000210000000900000000000000054e4f4e450000000208302e302e302e300000753200", |
| 92 | + }, |
| 93 | + {name: "SaslHandshake v1, kafka-client 2.3.1 ", apiKey: 17, apiVersion: 1, |
| 94 | + hexInput: "00000011000000000000000000010005504c41494e", |
| 95 | + }, |
| 96 | + {name: "ApiVersions v1, kafka-client 1.1.1", apiKey: 18, apiVersion: 1, |
| 97 | + hexInput: "0000012e0000000000000000003000000000000800010000000b000200000005000300000009000400000004000500000002000600000006000700000003000800000008000900000006000a00000003000b00000006000c00000004000d00000004000e00000004000f00000005001000000003001100000001001200000003001300000005001400000004001500000001001600000002001700000003001800000001001900000001001a00000001001b00000000001c00000002001d00000001001e00000001001f00000001002000000002002100000001002200000001002300000001002400000001002500000001002600000002002700000001002800000001002900000001002a00000002002b00000002002c00000001002d00000000002e00000000002f0000000000000000", |
| 98 | + }, |
| 99 | + {name: "ApiVersions v2, kafka-client 2.2.2", apiKey: 18, apiVersion: 2, |
| 100 | + hexInput: "0000012e0000000400000000003000000000000800010000000b000200000005000300000009000400000004000500000002000600000006000700000003000800000008000900000006000a00000003000b00000006000c00000004000d00000004000e00000004000f00000005001000000003001100000001001200000003001300000005001400000004001500000001001600000002001700000003001800000001001900000001001a00000001001b00000000001c00000002001d00000001001e00000001001f00000001002000000002002100000001002200000001002300000001002400000001002500000001002600000002002700000001002800000001002900000001002a00000002002b00000002002c00000001002d00000000002e00000000002f0000000000000000", |
| 101 | + }, |
| 102 | + {name: "ApiVersions v3, kafka-client 2.4.1 ", apiKey: 18, apiVersion: 3, |
| 103 | + hexInput: "0000015c000000030000310000000000080000010000000b000002000000050000030000000900000400000004000005000000020000060000000600000700000003000008000000080000090000000600000a0000000300000b0000000600000c0000000400000d0000000400000e0000000400000f000000050000100000000300001100000001000012000000030000130000000500001400000004000015000000010000160000000200001700000003000018000000010000190000000100001a0000000100001b0000000000001c0000000200001d0000000100001e0000000100001f000000010000200000000200002100000001000022000000010000230000000100002400000001000025000000010000260000000200002700000001000028000000010000290000000100002a0000000200002b0000000200002c0000000100002d0000000000002e0000000000002f00000000000000000000", |
| 104 | + }, |
| 105 | + {name: "DescribeAcls v2, kafka-client 2.4.1 ", apiKey: 29, apiVersion: 1, |
| 106 | + hexInput: "000000390000000300000000003600294e6f20417574686f72697a657220697320636f6e66696775726564206f6e207468652062726f6b657200000000", |
| 107 | + }, |
| 108 | + {name: "SaslAuthenticate v1, kafka-client 2.3.1 ", apiKey: 36, apiVersion: 1, |
| 109 | + hexInput: "0000000d00000003000000000000000100", |
| 110 | + }, |
| 111 | + {name: "ElectLeaders v2, kafka-client 2.4.1 ", apiKey: 43, apiVersion: 2, |
| 112 | + hexInput: "0000000d00000003000000000000000100", |
| 113 | + }, |
| 114 | + } |
| 115 | + for _, tc := range tt { |
| 116 | + |
| 117 | + input, err := hex.DecodeString(tc.hexInput) |
| 118 | + if err != nil { |
| 119 | + t.Fatal(err) |
| 120 | + } |
| 121 | + var expected []byte |
| 122 | + if tc.hexOutput != "" { |
| 123 | + expected, err = hex.DecodeString(tc.hexOutput) |
| 124 | + if err != nil { |
| 125 | + t.Fatal(err) |
| 126 | + } |
| 127 | + } else { |
| 128 | + expected = input |
| 129 | + } |
| 130 | + |
| 131 | + readBuffer := bytes.NewBuffer(input) |
| 132 | + src := &TestDeadlineReader{ |
| 133 | + Buffer: readBuffer, |
| 134 | + } |
| 135 | + output := bytes.NewBuffer(make([]byte, 0)) |
| 136 | + dst := &TestDeadlineWriter{ |
| 137 | + Buffer: output, |
| 138 | + } |
| 139 | + |
| 140 | + openRequestsChannel := make(chan protocol.RequestKeyVersion, 1) |
| 141 | + openRequestsChannel <- protocol.RequestKeyVersion{ApiKey: tc.apiKey, ApiVersion: tc.apiVersion} |
| 142 | + |
| 143 | + ctx := &ResponsesLoopContext{openRequestsChannel: openRequestsChannel, timeout: 1 * time.Second, buf: buf, netAddressMappingFunc: netAddressMappingFunc} |
| 144 | + |
| 145 | + a := assert.New(t) |
| 146 | + handler := &DefaultResponseHandler{} |
| 147 | + _, err = handler.handleResponse(dst, src, ctx) |
| 148 | + if err != nil { |
| 149 | + t.Fatal(err) |
| 150 | + } |
| 151 | + a.Equal(expected, output.Bytes()) |
| 152 | + a.Empty(readBuffer.Bytes()) // check all bytes from input has been read |
| 153 | + } |
| 154 | +} |
| 155 | + |
| 156 | +type TestDeadlineWriter struct { |
| 157 | + *bytes.Buffer |
| 158 | +} |
| 159 | + |
| 160 | +func (w *TestDeadlineWriter) SetWriteDeadline(t time.Time) error { |
| 161 | + return nil |
| 162 | +} |
| 163 | + |
| 164 | +type TestDeadlineReader struct { |
| 165 | + *bytes.Buffer |
| 166 | +} |
| 167 | + |
| 168 | +func (w *TestDeadlineReader) SetReadDeadline(t time.Time) error { |
| 169 | + return nil |
| 170 | +} |
0 commit comments