diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2967e87..68b66eb 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -35,7 +35,7 @@ jobs: uses: actions/checkout@v2 - name: Set up Go - uses: actions/setup-go@v1 + uses: actions/setup-go@v5 with: go-version: 1.22.x diff --git a/internal/assertions/apiversions_response_assertion.go b/internal/assertions/apiversions_response_assertion.go index 563bbc7..d8843c3 100644 --- a/internal/assertions/apiversions_response_assertion.go +++ b/internal/assertions/apiversions_response_assertion.go @@ -50,13 +50,18 @@ func (a ApiVersionsResponseAssertion) Evaluate(fields []string, AssertApiVersion for _, actualApiVersionKey := range a.ActualValue.ApiKeys { if actualApiVersionKey.ApiKey == expectedApiVersionKey.ApiKey { found = true - if actualApiVersionKey.MinVersion > expectedApiVersionKey.MinVersion { + if actualApiVersionKey.MaxVersion < expectedApiVersionKey.MinVersion { + return fmt.Errorf("Expected max version %v to be >= min version %v for %s", actualApiVersionKey.MaxVersion, actualApiVersionKey.MinVersion, apiKeyNames[expectedApiVersionKey.ApiKey]) + } + + // anything above or equal to expected minVersion is fine + if actualApiVersionKey.MinVersion < expectedApiVersionKey.MinVersion { return fmt.Errorf("Expected API version %v to be supported for %s, got %v", expectedApiVersionKey.MaxVersion, apiKeyNames[expectedApiVersionKey.ApiKey], actualApiVersionKey.MaxVersion) } logger.Successf("✓ API version %v is supported for %s", actualApiVersionKey.MaxVersion, apiKeyNames[expectedApiVersionKey.ApiKey]) - if actualApiVersionKey.MinVersion < expectedApiVersionKey.MinVersion { - return fmt.Errorf("Expected API version %v to be supported for %s, got %v", expectedApiVersionKey.MinVersion, apiKeyNames[expectedApiVersionKey.ApiKey], actualApiVersionKey.MinVersion) + if actualApiVersionKey.MaxVersion < expectedApiVersionKey.MaxVersion { + return fmt.Errorf("Expected API version %v to be supported for %s, got %v", expectedApiVersionKey.MaxVersion, apiKeyNames[expectedApiVersionKey.ApiKey], actualApiVersionKey.MaxVersion) } logger.Successf("✓ API version %v is supported for %s", actualApiVersionKey.MinVersion, apiKeyNames[expectedApiVersionKey.ApiKey]) } diff --git a/internal/cluster_metadata_payload_test.go b/internal/cluster_metadata_payload_test.go index 127956c..48eadba 100644 --- a/internal/cluster_metadata_payload_test.go +++ b/internal/cluster_metadata_payload_test.go @@ -6,13 +6,11 @@ import ( "testing" kafkaapi "github.com/codecrafters-io/kafka-tester/protocol/api" - "github.com/codecrafters-io/kafka-tester/protocol/decoder" - "github.com/codecrafters-io/kafka-tester/protocol/encoder" "github.com/codecrafters-io/kafka-tester/protocol/serializer" "github.com/stretchr/testify/assert" ) -func TestBeginTxnRecord(t *testing.T) { +func TestDecodeBeginTransactionRecordPayload(t *testing.T) { hexdump := "01170001001212426f6f747374726170207265636f726473" b, err := hex.DecodeString(hexdump) @@ -20,47 +18,20 @@ func TestBeginTxnRecord(t *testing.T) { panic(err) } - decoder := decoder.RealDecoder{} - decoder.Init(b) + beginTransactionRecord := kafkaapi.ClusterMetadataPayload{} + err = beginTransactionRecord.Decode(b) - frame, err := decoder.GetInt8() - fmt.Printf("frame: %d\n", frame) assert.NoError(t, err) + assert.EqualValues(t, 1, beginTransactionRecord.FrameVersion) + assert.EqualValues(t, 23, beginTransactionRecord.Type) + assert.EqualValues(t, 0, beginTransactionRecord.Version) - typ, err := decoder.GetInt8() - fmt.Printf("typ: %d\n", typ) - assert.NoError(t, err) - - version, err := decoder.GetInt8() - fmt.Printf("version: %d\n", version) - assert.NoError(t, err) - - tagFieldCount, err := decoder.GetUnsignedVarint() - fmt.Printf("tagFieldCount: %d\n", tagFieldCount) - assert.NoError(t, err) - - for i := 0; i < int(tagFieldCount); i++ { - tagType, err := decoder.GetUnsignedVarint() - fmt.Printf("tagType: %d\n", tagType) - assert.NoError(t, err) - - tagLength, err := decoder.GetUnsignedVarint() - fmt.Printf("tagLength: %d\n", tagLength) - assert.NoError(t, err) - - stringLength, err := decoder.GetUnsignedVarint() - fmt.Printf("stringLength: %d\n", stringLength) - assert.NoError(t, err) - - stringValue, err := decoder.GetRawBytes(int(stringLength) - 1) - fmt.Printf("stringValue: %s\n", stringValue) - assert.NoError(t, err) - } - - assert.Equal(t, 0, int(decoder.Remaining())) + payload, ok := beginTransactionRecord.Data.(*kafkaapi.BeginTransactionRecord) + assert.True(t, ok) + assert.EqualValues(t, "Bootstrap records", payload.Name) } -func TestFeatureLevelRecord(t *testing.T) { +func TestDecodeFeatureLevelRecordPayload(t *testing.T) { hexdump := "010c00116d657461646174612e76657273696f6e001400" b, err := hex.DecodeString(hexdump) @@ -68,41 +39,20 @@ func TestFeatureLevelRecord(t *testing.T) { panic(err) } - decoder := decoder.RealDecoder{} - decoder.Init(b) - - frame, err := decoder.GetInt8() - fmt.Printf("frame: %d\n", frame) - assert.NoError(t, err) - - typ, err := decoder.GetInt8() - fmt.Printf("typ: %d\n", typ) - assert.NoError(t, err) - - version, err := decoder.GetInt8() - fmt.Printf("version: %d\n", version) - assert.NoError(t, err) - - stringLength, err := decoder.GetUnsignedVarint() - fmt.Printf("stringLength: %d\n", stringLength) - assert.NoError(t, err) - - stringValue, err := decoder.GetRawBytes(int(stringLength) - 1) - fmt.Printf("stringValue: %s\n", stringValue) - assert.NoError(t, err) - - featureLevel, err := decoder.GetInt16() - fmt.Printf("featureLevel: %d\n", featureLevel) - assert.NoError(t, err) + featureLevelRecord := kafkaapi.ClusterMetadataPayload{} + err = featureLevelRecord.Decode(b) - tagFieldCount, err := decoder.GetUnsignedVarint() - fmt.Printf("tagFieldCount: %d\n", tagFieldCount) assert.NoError(t, err) + assert.EqualValues(t, 1, featureLevelRecord.FrameVersion) + assert.EqualValues(t, 12, featureLevelRecord.Type) + assert.EqualValues(t, 0, featureLevelRecord.Version) - assert.Equal(t, 0, int(decoder.Remaining())) + payload, ok := featureLevelRecord.Data.(*kafkaapi.FeatureLevelRecord) + assert.True(t, ok) + assert.EqualValues(t, "metadata.version", payload.Name) } -func TestZKMigrationRecord(t *testing.T) { +func TestDecodeZKMigrationRecordPayload(t *testing.T) { hexdump := "0115000000" b, err := hex.DecodeString(hexdump) @@ -110,33 +60,20 @@ func TestZKMigrationRecord(t *testing.T) { panic(err) } - decoder := decoder.RealDecoder{} - decoder.Init(b) + zkMigrationRecord := kafkaapi.ClusterMetadataPayload{} + err = zkMigrationRecord.Decode(b) - frame, err := decoder.GetInt8() - fmt.Printf("frame: %d\n", frame) assert.NoError(t, err) + assert.EqualValues(t, 1, zkMigrationRecord.FrameVersion) + assert.EqualValues(t, 21, zkMigrationRecord.Type) + assert.EqualValues(t, 0, zkMigrationRecord.Version) - typ, err := decoder.GetInt8() - fmt.Printf("typ: %d\n", typ) - assert.NoError(t, err) - - version, err := decoder.GetInt8() - fmt.Printf("version: %d\n", version) - assert.NoError(t, err) - - migrationState, err := decoder.GetInt8() - fmt.Printf("migrationState: %d\n", migrationState) - assert.NoError(t, err) - - tagFieldCount, err := decoder.GetUnsignedVarint() - fmt.Printf("tagFieldCount: %d\n", tagFieldCount) - assert.NoError(t, err) - - assert.Equal(t, 0, int(decoder.Remaining())) + payload, ok := zkMigrationRecord.Data.(*kafkaapi.ZKMigrationStateRecord) + assert.True(t, ok) + assert.EqualValues(t, 0, payload.MigrationState) } -func TestEndTxnRecord(t *testing.T) { +func TestDecodeEndTransactionRecordPayload(t *testing.T) { hexdump := "01180000" b, err := hex.DecodeString(hexdump) @@ -144,29 +81,19 @@ func TestEndTxnRecord(t *testing.T) { panic(err) } - decoder := decoder.RealDecoder{} - decoder.Init(b) - - frame, err := decoder.GetInt8() - fmt.Printf("frame: %d\n", frame) - assert.NoError(t, err) - - typ, err := decoder.GetInt8() - fmt.Printf("typ: %d\n", typ) - assert.NoError(t, err) - - version, err := decoder.GetInt8() - fmt.Printf("version: %d\n", version) - assert.NoError(t, err) + endTransactionRecord := kafkaapi.ClusterMetadataPayload{} + err = endTransactionRecord.Decode(b) - tagFieldCount, err := decoder.GetUnsignedVarint() - fmt.Printf("tagFieldCount: %d\n", tagFieldCount) assert.NoError(t, err) + assert.EqualValues(t, 1, endTransactionRecord.FrameVersion) + assert.EqualValues(t, 24, endTransactionRecord.Type) + assert.EqualValues(t, 0, endTransactionRecord.Version) - assert.Equal(t, 0, int(decoder.Remaining())) + _, ok := endTransactionRecord.Data.(*kafkaapi.EndTransactionRecord) + assert.True(t, ok) } -func TestTopicRecord(t *testing.T) { +func TestDecodeTopicRecordPayload(t *testing.T) { hexdump := "01020004666f6fbfd99e5e3235455281f8d4af1741970c00" b, err := hex.DecodeString(hexdump) @@ -174,41 +101,21 @@ func TestTopicRecord(t *testing.T) { panic(err) } - decoder := decoder.RealDecoder{} - decoder.Init(b) - - frame, err := decoder.GetInt8() - fmt.Printf("frame: %d\n", frame) - assert.NoError(t, err) - - typ, err := decoder.GetInt8() - fmt.Printf("typ: %d\n", typ) - assert.NoError(t, err) - - version, err := decoder.GetInt8() - fmt.Printf("version: %d\n", version) - assert.NoError(t, err) - - stringLength, err := decoder.GetUnsignedVarint() - fmt.Printf("stringLength: %d\n", stringLength) - assert.NoError(t, err) + topicRecord := kafkaapi.ClusterMetadataPayload{} + err = topicRecord.Decode(b) - stringValue, err := decoder.GetRawBytes(int(stringLength) - 1) - fmt.Printf("stringValue: %s\n", stringValue) assert.NoError(t, err) + assert.EqualValues(t, 1, topicRecord.FrameVersion) + assert.EqualValues(t, 2, topicRecord.Type) + assert.EqualValues(t, 0, topicRecord.Version) - topicID, err := getUUID(&decoder) - fmt.Printf("topicID: %s\n", topicID) - assert.NoError(t, err) - - tagFieldCount, err := decoder.GetUnsignedVarint() - fmt.Printf("tagFieldCount: %d\n", tagFieldCount) - assert.NoError(t, err) - - assert.Equal(t, 0, int(decoder.Remaining())) + payload, ok := topicRecord.Data.(*kafkaapi.TopicRecord) + assert.True(t, ok) + assert.EqualValues(t, "foo", payload.TopicName) + assert.EqualValues(t, "bfd99e5e-3235-4552-81f8-d4af1741970c", payload.TopicUUID) } -func TestPartitionRecord(t *testing.T) { +func TestDecodePartitionRecordPayload(t *testing.T) { hexdump := "01030100000000bfd99e5e3235455281f8d4af1741970c020000000102000000010101000000010000000000000000020224973cbadd44cf874445a99619da3400" b, err := hex.DecodeString(hexdump) @@ -216,123 +123,29 @@ func TestPartitionRecord(t *testing.T) { panic(err) } - decoder := decoder.RealDecoder{} - decoder.Init(b) - - frame, err := decoder.GetInt8() - fmt.Printf("frame: %d\n", frame) - assert.NoError(t, err) - - typ, err := decoder.GetInt8() - fmt.Printf("typ: %d\n", typ) - assert.NoError(t, err) - - version, err := decoder.GetInt8() - fmt.Printf("version: %d\n", version) - assert.NoError(t, err) - - partitionID, err := decoder.GetInt32() - fmt.Printf("partitionID: %d\n", partitionID) - assert.NoError(t, err) - - topicID, err := getUUID(&decoder) - fmt.Printf("topicID: %s\n", topicID) - assert.NoError(t, err) - - arrayLength, err := decoder.GetUnsignedVarint() - fmt.Printf("arrayLength: %d\n", arrayLength) - assert.NoError(t, err) - - var replicas []int32 - for i := 0; i < int(arrayLength-1); i++ { - replica, err := decoder.GetInt32() - assert.NoError(t, err) - replicas = append(replicas, replica) - } - fmt.Printf("replicas: %v\n", replicas) - - arrayLength, err = decoder.GetUnsignedVarint() - fmt.Printf("arrayLength: %d\n", arrayLength) - assert.NoError(t, err) - - var inSyncReplicas []int32 - for i := 0; i < int(arrayLength-1); i++ { - replica, err := decoder.GetInt32() - assert.NoError(t, err) - inSyncReplicas = append(inSyncReplicas, replica) - } - fmt.Printf("inSyncReplicas: %v\n", inSyncReplicas) - - arrayLength, err = decoder.GetUnsignedVarint() - fmt.Printf("arrayLength: %d\n", arrayLength) - assert.NoError(t, err) - - var removingReplicas []int32 - for i := 0; i < int(arrayLength-1); i++ { - replica, err := decoder.GetInt32() - assert.NoError(t, err) - removingReplicas = append(removingReplicas, replica) - } - fmt.Printf("removingReplicas: %v\n", removingReplicas) - - arrayLength, err = decoder.GetUnsignedVarint() - fmt.Printf("arrayLength: %d\n", arrayLength) - assert.NoError(t, err) - - var addingReplicas []int32 - for i := 0; i < int(arrayLength-1); i++ { - replica, err := decoder.GetInt32() - assert.NoError(t, err) - addingReplicas = append(addingReplicas, replica) - } - fmt.Printf("addingReplicas: %v\n", addingReplicas) - - leader, err := decoder.GetInt32() - fmt.Printf("leader: %d\n", leader) - assert.NoError(t, err) - - leaderEpoch, err := decoder.GetInt32() - fmt.Printf("leaderEpoch: %d\n", leaderEpoch) - assert.NoError(t, err) - - partitionEpoch, err := decoder.GetInt32() - fmt.Printf("partitionEpoch: %d\n", partitionEpoch) - assert.NoError(t, err) + partitionRecord := kafkaapi.ClusterMetadataPayload{} + err = partitionRecord.Decode(b) - if version >= 1 { - arrayLength, err = decoder.GetUnsignedVarint() - fmt.Printf("arrayLength: %d\n", arrayLength) - assert.NoError(t, err) - - var directories []string - for i := 0; i < int(arrayLength-1); i++ { - replica, err := getUUID(&decoder) - assert.NoError(t, err) - directories = append(directories, replica) - } - fmt.Printf("directories: %v\n", directories) - } - - tagFieldCount, err := decoder.GetUnsignedVarint() - fmt.Printf("tagFieldCount: %d\n", tagFieldCount) assert.NoError(t, err) + assert.EqualValues(t, 1, partitionRecord.FrameVersion) + assert.EqualValues(t, 3, partitionRecord.Type) + assert.EqualValues(t, 1, partitionRecord.Version) - assert.Equal(t, 0, int(decoder.Remaining())) -} - -func getUUID(pd *decoder.RealDecoder) (string, error) { - topicUUIDBytes, err := pd.GetRawBytes(16) - if err != nil { - return "", err - } - topicUUID, err := encoder.DecodeUUID(topicUUIDBytes) - if err != nil { - return "", err - } - return topicUUID, nil + payload, ok := partitionRecord.Data.(*kafkaapi.PartitionRecord) + assert.True(t, ok) + assert.EqualValues(t, 0, payload.PartitionID) + assert.EqualValues(t, "bfd99e5e-3235-4552-81f8-d4af1741970c", payload.TopicUUID) + assert.EqualValues(t, []int32{1}, payload.Replicas) + assert.EqualValues(t, []int32{1}, payload.ISReplicas) + assert.EqualValues(t, []int32{}, payload.RemovingReplicas) + assert.EqualValues(t, []int32{}, payload.AddingReplicas) + assert.EqualValues(t, 1, payload.Leader) + assert.EqualValues(t, 0, payload.LeaderEpoch) + assert.EqualValues(t, 0, payload.PartitionEpoch) + assert.EqualValues(t, []string{"0224973c-badd-44cf-8744-45a99619da34"}, payload.Directories) } -func TestEncodeBeginTransactionRecord(t *testing.T) { +func TestEncodeBeginTransactionRecordPayload(t *testing.T) { beginTransactionRecord := kafkaapi.ClusterMetadataPayload{ FrameVersion: 1, Type: 23, @@ -348,7 +161,7 @@ func TestEncodeBeginTransactionRecord(t *testing.T) { assert.Equal(t, "01170001001212426f6f747374726170207265636f726473", hex.EncodeToString(bytes)) } -func TestEncodeFeatureLevelRecord(t *testing.T) { +func TestEncodeFeatureLevelRecordPayload(t *testing.T) { featureLevelRecord := kafkaapi.ClusterMetadataPayload{ FrameVersion: 1, Type: 12, @@ -365,7 +178,7 @@ func TestEncodeFeatureLevelRecord(t *testing.T) { assert.Equal(t, "010c00116d657461646174612e76657273696f6e001400", hex.EncodeToString(bytes)) } -func TestEncodeZKMigrationRecord(t *testing.T) { +func TestEncodeZKMigrationRecordPayload(t *testing.T) { zkMigrationRecord := kafkaapi.ClusterMetadataPayload{ FrameVersion: 1, Type: 21, @@ -378,7 +191,7 @@ func TestEncodeZKMigrationRecord(t *testing.T) { assert.Equal(t, "0115000000", hex.EncodeToString(bytes)) } -func TestEncodeEndTransactionRecord(t *testing.T) { +func TestEncodeEndTransactionRecordPayload(t *testing.T) { endTransactionRecord := kafkaapi.ClusterMetadataPayload{ FrameVersion: 1, Type: 24, @@ -392,7 +205,7 @@ func TestEncodeEndTransactionRecord(t *testing.T) { assert.Equal(t, "01180000", hex.EncodeToString(bytes)) } -func TestEncodeTopicRecord(t *testing.T) { +func TestEncodeTopicRecordPayload(t *testing.T) { topicRecord := kafkaapi.ClusterMetadataPayload{ FrameVersion: 1, Type: 2, @@ -407,7 +220,7 @@ func TestEncodeTopicRecord(t *testing.T) { assert.Equal(t, "01020004666f6fbfd99e5e3235455281f8d4af1741970c00", hex.EncodeToString(bytes)) } -func TestEncodePartitionRecord(t *testing.T) { +func TestEncodePartitionRecordPayload(t *testing.T) { partitionRecord := kafkaapi.ClusterMetadataPayload{ FrameVersion: 1, Type: 3, diff --git a/internal/fetch_test.go b/internal/fetch_test.go index 2caafaf..0aed659 100644 --- a/internal/fetch_test.go +++ b/internal/fetch_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestFetchv16_0m(t *testing.T) { +func TestFetchv16With0Messages(t *testing.T) { hexdump := "0000000b000000000000004fd5de7102c2a21ee23db74b6cbcc32a051cc51fc90200000000000000000000000000000000000000000000000000000000000000ffffffff01000000" b, err := hex.DecodeString(hexdump) @@ -26,13 +26,13 @@ func TestFetchv16_0m(t *testing.T) { if err = header.DecodeV1(&decoder, logger.GetQuietLogger(""), 0); err != nil { fmt.Println(decoder.FormatDetailedError(err.Error())) - panic("I QUIT") + return } response := kafkaapi.FetchResponse{Version: 16} if err = response.Decode(&decoder, 16, logger.GetQuietLogger(""), 0); err != nil { fmt.Println(decoder.FormatDetailedError(err.Error())) - panic("I QUIT") + return } assert.NoError(t, err) @@ -55,7 +55,7 @@ func TestFetchv16_0m(t *testing.T) { } } -func TestFetchv16_1m(t *testing.T) { +func TestFetchv16With1Message(t *testing.T) { hexdump := "0000000b000000000000007a983da40282e9d296c41249f0bcc9b03cdcc4888a0200000000000000000000000000010000000000000001000000000000000000ffffffff4900000000000000000000003c0000000002efe33e7c00000000000000000191937e258d00000191937e258d0000000000000001000000000000000000011400000001086d73673100000000" b, err := hex.DecodeString(hexdump) @@ -70,13 +70,13 @@ func TestFetchv16_1m(t *testing.T) { if err = header.DecodeV1(&decoder, logger.GetQuietLogger(""), 0); err != nil { fmt.Println(decoder.FormatDetailedError(err.Error())) - panic("I QUIT") + return } response := kafkaapi.FetchResponse{Version: 16} if err = response.Decode(&decoder, 16, logger.GetQuietLogger(""), 0); err != nil { fmt.Println(decoder.FormatDetailedError(err.Error())) - panic("I QUIT") + return } assert.NoError(t, err) @@ -107,7 +107,7 @@ func TestFetchv16_1m(t *testing.T) { assert.Equal(t, []string{"msg1"}, messages) } -func TestFetchv16_2m(t *testing.T) { +func TestFetchv16With2Messages(t *testing.T) { hexdump := "0000000b00000000000000064771770282e9d296c41249f0bcc9b03cdcc4888a0200000000000000000000000000020000000000000002000000000000000000ffffffff910100000000000000000000003c0000000002efe33e7c00000000000000000191937e258d00000191937e258d0000000000000001000000000000000000011400000001086d7367310000000000000000010000003c00000000026501ee010000000000000000019193877fe20000019193877fe20000000000000002000000000000000000011400000001086d73673200000000" b, err := hex.DecodeString(hexdump) @@ -123,13 +123,13 @@ func TestFetchv16_2m(t *testing.T) { if err = header.DecodeV1(&decoder, logger.GetQuietLogger(""), 0); err != nil { fmt.Println(decoder.FormatDetailedError(err.Error())) - panic("I QUIT") + return } response := kafkaapi.FetchResponse{Version: 16} if err = response.Decode(&decoder, 16, logger.GetQuietLogger(""), 0); err != nil { fmt.Println(decoder.FormatDetailedError(err.Error())) - panic("I QUIT") + return } assert.NoError(t, err) @@ -157,7 +157,7 @@ func TestFetchv16_2m(t *testing.T) { assert.Equal(t, []string{"msg1", "msg2"}, messages) } -func TestFetchv16_3m(t *testing.T) { +func TestFetchv16With3Messages(t *testing.T) { hexdump := "0000000b0000000000000019ffd72602c2a21ee23db74b6cbcc32a051cc51fc90200000000000000000000000000030000000000000003000000000000000000ffffffffd90100000000000000000000003c00000000026eca8a5d0000000000000000019192c16b5d0000019192c16b5d0000000000000000000000000000000000011400000001086d7367310000000000000000010000003c0000000002ad286abc0000000000000000019192c170b70000019192c170b70000000000000000000000000001000000011400000001086d7367320000000000000000020000003c0000000002d0470b040000000000000000019192c176990000019192c176990000000000000000000000000002000000011400000001086d73673300000000" b, err := hex.DecodeString(hexdump) @@ -173,13 +173,13 @@ func TestFetchv16_3m(t *testing.T) { if err = header.DecodeV1(&decoder, logger.GetQuietLogger(""), 0); err != nil { fmt.Println(decoder.FormatDetailedError(err.Error())) - panic("I QUIT") + return } response := kafkaapi.FetchResponse{Version: 16} if err = response.Decode(&decoder, 16, logger.GetQuietLogger(""), 0); err != nil { fmt.Println(decoder.FormatDetailedError(err.Error())) - panic("I QUIT") + return } assert.NoError(t, err) @@ -221,13 +221,13 @@ func TestAPIVersionv3(t *testing.T) { responseHeader := kafkaapi.ResponseHeader{} if err := responseHeader.DecodeV0(&decoder, logger.GetQuietLogger(""), 0); err != nil { fmt.Println(decoder.FormatDetailedError(err.Error())) - panic("I QUIT") + return } apiVersionsResponse := kafkaapi.ApiVersionsResponse{Version: 3} if err := apiVersionsResponse.Decode(&decoder, 3, logger.GetQuietLogger(""), 0); err != nil { fmt.Println(decoder.FormatDetailedError(err.Error())) - panic("I QUIT") + return } assert.NoError(t, err) diff --git a/internal/stagef2.go b/internal/stagef2.go index de5dc05..b415fc9 100644 --- a/internal/stagef2.go +++ b/internal/stagef2.go @@ -3,6 +3,7 @@ package internal import ( "fmt" + "github.com/codecrafters-io/kafka-tester/internal/assertions" "github.com/codecrafters-io/kafka-tester/internal/kafka_executable" "github.com/codecrafters-io/kafka-tester/protocol" kafkaapi "github.com/codecrafters-io/kafka-tester/protocol/api" @@ -65,10 +66,12 @@ func testFetchWithNoTopics(stageHarness *test_case_harness.TestCaseHarness) erro return err } - if responseHeader.CorrelationId != correlationId { - return fmt.Errorf("Expected Correlation ID to be %v, got %v", correlationId, responseHeader.CorrelationId) + expectedResponseHeader := kafkaapi.ResponseHeader{ + CorrelationId: correlationId, + } + if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, logger); err != nil { + return err } - logger.Successf("✓ Correlation ID: %v", responseHeader.CorrelationId) if responseBody.ErrorCode != 0 { return fmt.Errorf("Expected Error code to be 0, got %v", responseBody.ErrorCode) diff --git a/internal/stagef3.go b/internal/stagef3.go index 14f753e..dcec3f7 100644 --- a/internal/stagef3.go +++ b/internal/stagef3.go @@ -3,6 +3,7 @@ package internal import ( "fmt" + "github.com/codecrafters-io/kafka-tester/internal/assertions" "github.com/codecrafters-io/kafka-tester/internal/kafka_executable" "github.com/codecrafters-io/kafka-tester/protocol" kafkaapi "github.com/codecrafters-io/kafka-tester/protocol/api" @@ -82,10 +83,12 @@ func testFetchWithUnkownTopicID(stageHarness *test_case_harness.TestCaseHarness) return err } - if responseHeader.CorrelationId != correlationId { - return fmt.Errorf("Expected Correlation ID to be %v, got %v", correlationId, responseHeader.CorrelationId) + expectedResponseHeader := kafkaapi.ResponseHeader{ + CorrelationId: correlationId, + } + if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, logger); err != nil { + return err } - logger.Successf("✓ Correlation ID: %v", responseHeader.CorrelationId) if responseBody.ErrorCode != 0 { return fmt.Errorf("Expected Error code to be 0, got %v", responseBody.ErrorCode) diff --git a/internal/test_helpers/fixtures/base/pass b/internal/test_helpers/fixtures/base/pass index 0ff9f4f..4bf51c3 100644 --- a/internal/test_helpers/fixtures/base/pass +++ b/internal/test_helpers/fixtures/base/pass @@ -3,18 +3,19 @@ Debug = true [stage-5] Running tests for Stage #5: pv1 [stage-5] $ ./your_program.sh /tmp/server.properties [stage-5] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/partition.metadata -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/partition.metadata -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/partition.metadata -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/partition.metadata -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/00000000000000000000.log -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/00000000000000000000.log -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/00000000000000000000.log -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/00000000000000000000.log -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-5] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-5] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-5] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-5] Connecting to broker at: localhost:9092 [stage-5] Connection to broker at localhost:9092 successful @@ -387,18 +388,19 @@ Debug = true [stage-4] Running tests for Stage #4: nc5 [stage-4] $ ./your_program.sh /tmp/server.properties [stage-4] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/partition.metadata -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/partition.metadata -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/partition.metadata -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/partition.metadata -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/00000000000000000000.log -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/00000000000000000000.log -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/00000000000000000000.log -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/00000000000000000000.log -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-4] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-4] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-4] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-4] Connecting to broker at: localhost:9092 [stage-4] Connection to broker at localhost:9092 successful @@ -430,18 +432,19 @@ Debug = true [stage-3] Running tests for Stage #3: wa6 [stage-3] $ ./your_program.sh /tmp/server.properties [stage-3] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-3] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-3] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-3] Connecting to broker at: localhost:9092 [stage-3] Connection to broker at localhost:9092 successful @@ -488,7 +491,7 @@ Debug = true [stage-3] 01d0 | 00 0e 6b 72 61 66 74 2e 76 65 72 73 69 6f 6e 00 | ..kraft.version. [stage-3] 01e0 | 00 00 01 00 11 6d 65 74 61 64 61 74 61 2e 76 65 | .....metadata.ve [stage-3] 01f0 | 72 73 69 6f 6e 00 01 00 16 00 01 08 00 00 00 00 | rsion........... -[stage-3] 0200 | 00 00 00 11 02 17 02 11 6d 65 74 61 64 61 74 61 | ........metadata +[stage-3] 0200 | 00 00 00 0c 02 17 02 11 6d 65 74 61 64 61 74 61 | ........metadata [stage-3] 0210 | 2e 76 65 72 73 69 6f 6e 00 14 00 14 00 | .version..... [stage-3]  [stage-3] [Decoder] - .Response @@ -503,18 +506,19 @@ Debug = true [stage-2] Running tests for Stage #2: nv3 [stage-2] $ ./your_program.sh /tmp/server.properties [stage-2] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-2] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-2] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-2] Connecting to broker at: localhost:9092 [stage-2] Connection to broker at localhost:9092 successful @@ -576,18 +580,19 @@ Debug = true [stage-1] Running tests for Stage #1: vi6 [stage-1] $ ./your_program.sh /tmp/server.properties [stage-1] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-1] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-1] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-1] Connecting to port 9092... [stage-1] Connection successful diff --git a/internal/test_helpers/fixtures/concurrent_stages/pass b/internal/test_helpers/fixtures/concurrent_stages/pass index d5d9cc1..eb2cebf 100644 --- a/internal/test_helpers/fixtures/concurrent_stages/pass +++ b/internal/test_helpers/fixtures/concurrent_stages/pass @@ -3,18 +3,19 @@ Debug = true [stage-2] Running tests for Stage #2: nh4 [stage-2] $ ./your_program.sh /tmp/server.properties [stage-2] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-2] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-2] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-2] Connecting to broker at: localhost:9092 [stage-2] Connection to broker at localhost:9092 successful @@ -60,7 +61,7 @@ Debug = true [stage-2] 01c0 | 2e 76 65 72 73 69 6f 6e 00 00 00 01 00 0e 6b 72 | .version......kr [stage-2] 01d0 | 61 66 74 2e 76 65 72 73 69 6f 6e 00 00 00 01 00 | aft.version..... [stage-2] 01e0 | 11 6d 65 74 61 64 61 74 61 2e 76 65 72 73 69 6f | .metadata.versio -[stage-2] 01f0 | 6e 00 01 00 16 00 01 08 00 00 00 00 00 00 00 12 | n............... +[stage-2] 01f0 | 6e 00 01 00 16 00 01 08 00 00 00 00 00 00 00 0c | n............... [stage-2] 0200 | 02 17 02 11 6d 65 74 61 64 61 74 61 2e 76 65 72 | ....metadata.ver [stage-2] 0210 | 73 69 6f 6e 00 14 00 14 00 | sion..... [stage-2]  @@ -423,7 +424,7 @@ Debug = true [stage-2] 01c0 | 2e 76 65 72 73 69 6f 6e 00 00 00 01 00 0e 6b 72 | .version......kr [stage-2] 01d0 | 61 66 74 2e 76 65 72 73 69 6f 6e 00 00 00 01 00 | aft.version..... [stage-2] 01e0 | 11 6d 65 74 61 64 61 74 61 2e 76 65 72 73 69 6f | .metadata.versio -[stage-2] 01f0 | 6e 00 01 00 16 00 01 08 00 00 00 00 00 00 00 12 | n............... +[stage-2] 01f0 | 6e 00 01 00 16 00 01 08 00 00 00 00 00 00 00 0c | n............... [stage-2] 0200 | 02 17 02 11 6d 65 74 61 64 61 74 61 2e 76 65 72 | ....metadata.ver [stage-2] 0210 | 73 69 6f 6e 00 14 00 14 00 | sion..... [stage-2]  @@ -751,18 +752,19 @@ Debug = true [stage-1] Running tests for Stage #1: sk0 [stage-1] $ ./your_program.sh /tmp/server.properties [stage-1] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/quz-1/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-1] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-1] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-1] Connecting to broker at: localhost:9092 [stage-1] Connection to broker at localhost:9092 successful @@ -818,7 +820,7 @@ Debug = true [stage-1] 01c0 | 2e 76 65 72 73 69 6f 6e 00 00 00 01 00 0e 6b 72 | .version......kr [stage-1] 01d0 | 61 66 74 2e 76 65 72 73 69 6f 6e 00 00 00 01 00 | aft.version..... [stage-1] 01e0 | 11 6d 65 74 61 64 61 74 61 2e 76 65 72 73 69 6f | .metadata.versio -[stage-1] 01f0 | 6e 00 01 00 16 00 01 08 00 00 00 00 00 00 00 12 | n............... +[stage-1] 01f0 | 6e 00 01 00 16 00 01 08 00 00 00 00 00 00 00 0c | n............... [stage-1] 0200 | 02 17 02 11 6d 65 74 61 64 61 74 61 2e 76 65 72 | ....metadata.ver [stage-1] 0210 | 73 69 6f 6e 00 14 00 14 00 | sion..... [stage-1]  @@ -1173,7 +1175,7 @@ Debug = true [stage-1] 01c0 | 2e 76 65 72 73 69 6f 6e 00 00 00 01 00 0e 6b 72 | .version......kr [stage-1] 01d0 | 61 66 74 2e 76 65 72 73 69 6f 6e 00 00 00 01 00 | aft.version..... [stage-1] 01e0 | 11 6d 65 74 61 64 61 74 61 2e 76 65 72 73 69 6f | .metadata.versio -[stage-1] 01f0 | 6e 00 01 00 16 00 01 08 00 00 00 00 00 00 00 12 | n............... +[stage-1] 01f0 | 6e 00 01 00 16 00 01 08 00 00 00 00 00 00 00 0c | n............... [stage-1] 0200 | 02 17 02 11 6d 65 74 61 64 61 74 61 2e 76 65 72 | ....metadata.ver [stage-1] 0210 | 73 69 6f 6e 00 14 00 14 00 | sion..... [stage-1]  diff --git a/internal/test_helpers/fixtures/describe_topic_partitions/pass b/internal/test_helpers/fixtures/describe_topic_partitions/pass index f475199..4923563 100644 --- a/internal/test_helpers/fixtures/describe_topic_partitions/pass +++ b/internal/test_helpers/fixtures/describe_topic_partitions/pass @@ -46,7 +46,7 @@ Debug = true [stage-5] 01c0 | 2e 76 65 72 73 69 6f 6e 00 00 00 01 00 0e 6b 72 | .version......kr [stage-5] 01d0 | 61 66 74 2e 76 65 72 73 69 6f 6e 00 00 00 01 00 | aft.version..... [stage-5] 01e0 | 11 6d 65 74 61 64 61 74 61 2e 76 65 72 73 69 6f | .metadata.versio -[stage-5] 01f0 | 6e 00 01 00 16 00 01 08 00 00 00 00 00 00 00 12 | n............... +[stage-5] 01f0 | 6e 00 01 00 16 00 01 08 00 00 00 00 00 00 00 0c | n............... [stage-5] 0200 | 02 17 02 11 6d 65 74 61 64 61 74 61 2e 76 65 72 | ....metadata.ver [stage-5] 0210 | 73 69 6f 6e 00 14 00 14 00 | sion..... [stage-5]  @@ -408,7 +408,8 @@ Debug = true [stage-4] [Decoder]  - .num_partitions (0) [stage-4] [Decoder]  - .topic_authorized_operations (3576) [stage-4] [Decoder]  - .TAG_BUFFER -[stage-4] [Decoder]  - .next_cursor (null) +[stage-4] [Decoder]  - .next_cursor +[stage-4] [Decoder]  - .cursor_is_null (0xFF) [stage-4] [Decoder]  - .TAG_BUFFER [stage-4] ✓ Correlation ID: 1616512461 [stage-4] ✓ TopicResponse Error code: 3 (UNKNOWN_TOPIC_OR_PARTITION) @@ -421,18 +422,19 @@ Debug = true [stage-3] Running tests for Stage #3: ea7 [stage-3] $ ./your_program.sh /tmp/server.properties [stage-3] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-3] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-3] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-3] Connecting to broker at: localhost:9092 [stage-3] Connection to broker at localhost:9092 successful @@ -448,7 +450,7 @@ Debug = true [stage-3] Idx | Hex | ASCII [stage-3] -----+-------------------------------------------------+----------------- [stage-3] 0000 | 05 c3 b0 2a 00 00 00 00 00 02 00 00 04 62 61 7a | ...*.........baz -[stage-3] 0010 | 00 00 00 00 00 00 40 00 80 00 00 00 00 00 00 53 | ......@........S +[stage-3] 0010 | 00 00 00 00 00 00 40 00 80 00 00 00 00 00 00 19 | ......@......... [stage-3] 0020 | 00 02 00 00 00 00 00 00 00 00 00 01 00 00 00 00 | ................ [stage-3] 0030 | 02 00 00 00 01 02 00 00 00 01 01 01 01 00 00 00 | ................ [stage-3] 0040 | 0d f8 00 ff 00 | ..... @@ -462,7 +464,7 @@ Debug = true [stage-3] [Decoder]  - .Topics[0] [stage-3] [Decoder]  - .error_code (0) [stage-3] [Decoder]  - .name (baz) -[stage-3] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000053) +[stage-3] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000019) [stage-3] [Decoder]  - .is_internal (false) [stage-3] [Decoder]  - .num_partitions (1) [stage-3] [Decoder]  - .Partitions[0] @@ -478,12 +480,13 @@ Debug = true [stage-3] [Decoder]  - .TAG_BUFFER [stage-3] [Decoder]  - .topic_authorized_operations (3576) [stage-3] [Decoder]  - .TAG_BUFFER -[stage-3] [Decoder]  - .next_cursor (null) +[stage-3] [Decoder]  - .next_cursor +[stage-3] [Decoder]  - .cursor_is_null (0xFF) [stage-3] [Decoder]  - .TAG_BUFFER [stage-3] ✓ Correlation ID: 96710698 [stage-3] ✓ TopicResponse Error code: 0 [stage-3] ✓ Topic Name: baz -[stage-3] ✓ Topic UUID: 00000000-0000-4000-8000-000000000053 +[stage-3] ✓ Topic UUID: 00000000-0000-4000-8000-000000000019 [stage-3] ✓ PartitionResponse Error code: 0 [stage-3] ✓ PartitionResponse Partition Index: 0 [stage-3] Test passed. @@ -493,18 +496,19 @@ Debug = true [stage-2] Running tests for Stage #2: ku4 [stage-2] $ ./your_program.sh /tmp/server.properties [stage-2] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-2] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-2] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-2] Connecting to broker at: localhost:9092 [stage-2] Connection to broker at localhost:9092 successful @@ -513,14 +517,14 @@ Debug = true [stage-2] Idx | Hex | ASCII [stage-2] -----+-------------------------------------------------+----------------- [stage-2] 0000 | 00 00 00 23 00 4b 00 00 0f 2e 14 fa 00 0c 6b 61 | ...#.K........ka -[stage-2] 0010 | 66 6b 61 2d 74 65 73 74 65 72 00 02 04 71 75 78 | fka-tester...qux +[stage-2] 0010 | 66 6b 61 2d 74 65 73 74 65 72 00 02 04 70 61 7a | fka-tester...paz [stage-2] 0020 | 00 00 00 00 02 ff 00 | ....... [stage-2]  [stage-2] Hexdump of received "DescribeTopicPartition" response:  [stage-2] Idx | Hex | ASCII [stage-2] -----+-------------------------------------------------+----------------- -[stage-2] 0000 | 0f 2e 14 fa 00 00 00 00 00 02 00 00 04 71 75 78 | .............qux -[stage-2] 0010 | 00 00 00 00 00 00 40 00 80 00 00 00 00 00 00 11 | ......@......... +[stage-2] 0000 | 0f 2e 14 fa 00 00 00 00 00 02 00 00 04 70 61 7a | .............paz +[stage-2] 0010 | 00 00 00 00 00 00 40 00 80 00 00 00 00 00 00 43 | ......@........C [stage-2] 0020 | 00 03 00 00 00 00 00 00 00 00 00 01 00 00 00 00 | ................ [stage-2] 0030 | 02 00 00 00 01 02 00 00 00 01 01 01 01 00 00 00 | ................ [stage-2] 0040 | 00 00 00 01 00 00 00 01 00 00 00 00 02 00 00 00 | ................ @@ -535,8 +539,8 @@ Debug = true [stage-2] [Decoder]  - .topic.length (1) [stage-2] [Decoder]  - .Topics[0] [stage-2] [Decoder]  - .error_code (0) -[stage-2] [Decoder]  - .name (qux) -[stage-2] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000011) +[stage-2] [Decoder]  - .name (paz) +[stage-2] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000043) [stage-2] [Decoder]  - .is_internal (false) [stage-2] [Decoder]  - .num_partitions (2) [stage-2] [Decoder]  - .Partitions[0] @@ -563,12 +567,13 @@ Debug = true [stage-2] [Decoder]  - .TAG_BUFFER [stage-2] [Decoder]  - .topic_authorized_operations (3576) [stage-2] [Decoder]  - .TAG_BUFFER -[stage-2] [Decoder]  - .next_cursor (null) +[stage-2] [Decoder]  - .next_cursor +[stage-2] [Decoder]  - .cursor_is_null (0xFF) [stage-2] [Decoder]  - .TAG_BUFFER [stage-2] ✓ Correlation ID: 254678266 [stage-2] ✓ TopicResponse Error code: 0 -[stage-2] ✓ Topic Name: qux -[stage-2] ✓ Topic UUID: 00000000-0000-4000-8000-000000000011 +[stage-2] ✓ Topic Name: paz +[stage-2] ✓ Topic UUID: 00000000-0000-4000-8000-000000000043 [stage-2] ✓ PartitionResponse[0] Error code: 0 [stage-2] ✓ PartitionResponse[0] Partition Index: 0 [stage-2] ✓ PartitionResponse[1] Error code: 0 @@ -580,18 +585,19 @@ Debug = true [stage-1] Running tests for Stage #1: wq2 [stage-1] $ ./your_program.sh /tmp/server.properties [stage-1] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-1] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-1] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-1] Connecting to broker at: localhost:9092 [stage-1] Connection to broker at localhost:9092 successful @@ -601,22 +607,22 @@ Debug = true [stage-1] -----+-------------------------------------------------+----------------- [stage-1] 0000 | 00 00 00 2d 00 4b 00 00 70 d8 81 15 00 0c 6b 61 | ...-.K..p.....ka [stage-1] 0010 | 66 6b 61 2d 74 65 73 74 65 72 00 04 04 62 61 7a | fka-tester...baz -[stage-1] 0020 | 00 04 66 6f 6f 00 04 71 75 78 00 00 00 00 04 ff | ..foo..qux...... +[stage-1] 0020 | 00 04 66 6f 6f 00 04 70 61 7a 00 00 00 00 04 ff | ..foo..paz...... [stage-1] 0030 | 00 | . [stage-1]  [stage-1] Hexdump of received "DescribeTopicPartition" response:  [stage-1] Idx | Hex | ASCII [stage-1] -----+-------------------------------------------------+----------------- [stage-1] 0000 | 70 d8 81 15 00 00 00 00 00 04 00 00 04 62 61 7a | p............baz -[stage-1] 0010 | 00 00 00 00 00 00 40 00 80 00 00 00 00 00 00 53 | ......@........S +[stage-1] 0010 | 00 00 00 00 00 00 40 00 80 00 00 00 00 00 00 19 | ......@......... [stage-1] 0020 | 00 02 00 00 00 00 00 00 00 00 00 01 00 00 00 00 | ................ [stage-1] 0030 | 02 00 00 00 01 02 00 00 00 01 01 01 01 00 00 00 | ................ [stage-1] 0040 | 0d f8 00 00 00 04 66 6f 6f 00 00 00 00 00 00 40 | ......foo......@ -[stage-1] 0050 | 00 80 00 00 00 00 00 00 33 00 02 00 00 00 00 00 | ........3....... +[stage-1] 0050 | 00 80 00 00 00 00 00 00 71 00 02 00 00 00 00 00 | ........q....... [stage-1] 0060 | 00 00 00 00 01 00 00 00 00 02 00 00 00 01 02 00 | ................ -[stage-1] 0070 | 00 00 01 01 01 01 00 00 00 0d f8 00 00 00 04 71 | ...............q -[stage-1] 0080 | 75 78 00 00 00 00 00 00 40 00 80 00 00 00 00 00 | ux......@....... -[stage-1] 0090 | 00 11 00 03 00 00 00 00 00 00 00 00 00 01 00 00 | ................ +[stage-1] 0070 | 00 00 01 01 01 01 00 00 00 0d f8 00 00 00 04 70 | ...............p +[stage-1] 0080 | 61 7a 00 00 00 00 00 00 40 00 80 00 00 00 00 00 | az......@....... +[stage-1] 0090 | 00 43 00 03 00 00 00 00 00 00 00 00 00 01 00 00 | .C.............. [stage-1] 00a0 | 00 00 02 00 00 00 01 02 00 00 00 01 01 01 01 00 | ................ [stage-1] 00b0 | 00 00 00 00 00 01 00 00 00 01 00 00 00 00 02 00 | ................ [stage-1] 00c0 | 00 00 01 02 00 00 00 01 01 01 01 00 00 00 0d f8 | ................ @@ -631,7 +637,7 @@ Debug = true [stage-1] [Decoder]  - .Topics[0] [stage-1] [Decoder]  - .error_code (0) [stage-1] [Decoder]  - .name (baz) -[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000053) +[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000019) [stage-1] [Decoder]  - .is_internal (false) [stage-1] [Decoder]  - .num_partitions (1) [stage-1] [Decoder]  - .Partitions[0] @@ -650,7 +656,7 @@ Debug = true [stage-1] [Decoder]  - .Topics[1] [stage-1] [Decoder]  - .error_code (0) [stage-1] [Decoder]  - .name (foo) -[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000033) +[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000071) [stage-1] [Decoder]  - .is_internal (false) [stage-1] [Decoder]  - .num_partitions (1) [stage-1] [Decoder]  - .Partitions[0] @@ -668,8 +674,8 @@ Debug = true [stage-1] [Decoder]  - .TAG_BUFFER [stage-1] [Decoder]  - .Topics[2] [stage-1] [Decoder]  - .error_code (0) -[stage-1] [Decoder]  - .name (qux) -[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000011) +[stage-1] [Decoder]  - .name (paz) +[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000043) [stage-1] [Decoder]  - .is_internal (false) [stage-1] [Decoder]  - .num_partitions (2) [stage-1] [Decoder]  - .Partitions[0] @@ -696,22 +702,23 @@ Debug = true [stage-1] [Decoder]  - .TAG_BUFFER [stage-1] [Decoder]  - .topic_authorized_operations (3576) [stage-1] [Decoder]  - .TAG_BUFFER -[stage-1] [Decoder]  - .next_cursor (null) +[stage-1] [Decoder]  - .next_cursor +[stage-1] [Decoder]  - .cursor_is_null (0xFF) [stage-1] [Decoder]  - .TAG_BUFFER [stage-1] ✓ Correlation ID: 1893237013 [stage-1] ✓ TopicResponse Error code: 0 [stage-1] ✓ Topic Name: baz -[stage-1] ✓ Topic UUID: 00000000-0000-4000-8000-000000000053 +[stage-1] ✓ Topic UUID: 00000000-0000-4000-8000-000000000019 [stage-1] ✓ PartitionResponse[0] Error code: 0 [stage-1] ✓ PartitionResponse[0] Partition Index: 0 [stage-1] ✓ TopicResponse Error code: 0 [stage-1] ✓ Topic Name: foo -[stage-1] ✓ Topic UUID: 00000000-0000-4000-8000-000000000033 +[stage-1] ✓ Topic UUID: 00000000-0000-4000-8000-000000000071 [stage-1] ✓ PartitionResponse[0] Error code: 0 [stage-1] ✓ PartitionResponse[0] Partition Index: 0 [stage-1] ✓ TopicResponse Error code: 0 -[stage-1] ✓ Topic Name: qux -[stage-1] ✓ Topic UUID: 00000000-0000-4000-8000-000000000011 +[stage-1] ✓ Topic Name: paz +[stage-1] ✓ Topic UUID: 00000000-0000-4000-8000-000000000043 [stage-1] ✓ PartitionResponse[0] Error code: 0 [stage-1] ✓ PartitionResponse[0] Partition Index: 0 [stage-1] ✓ PartitionResponse[1] Error code: 0 diff --git a/internal/test_helpers/fixtures/fetch/pass b/internal/test_helpers/fixtures/fetch/pass index dadc875..3838bdb 100644 --- a/internal/test_helpers/fixtures/fetch/pass +++ b/internal/test_helpers/fixtures/fetch/pass @@ -3,18 +3,19 @@ Debug = true [stage-4] Running tests for Stage #4: gs0 [stage-4] $ ./your_program.sh /tmp/server.properties [stage-4] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/partition.metadata -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/partition.metadata -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/00000000000000000000.log -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/00000000000000000000.log -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-4] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-4] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-4] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-4] Connecting to broker at: localhost:9092 [stage-4] Connection to broker at localhost:9092 successful @@ -390,18 +391,19 @@ Debug = true [stage-3] Running tests for Stage #3: dh6 [stage-3] $ ./your_program.sh /tmp/server.properties [stage-3] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-3] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-3] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-3] Connecting to broker at: localhost:9092 [stage-3] Connection to broker at localhost:9092 successful @@ -417,7 +419,7 @@ Debug = true [stage-3] Hexdump of received "Fetch" response:  [stage-3] Idx | Hex | ASCII [stage-3] -----+-------------------------------------------------+----------------- -[stage-3] 0000 | 60 5a 05 cd 00 00 00 00 00 00 00 03 ad 09 d2 01 | `Z.............. +[stage-3] 0000 | 60 5a 05 cd 00 00 00 00 00 00 00 08 34 e5 4d 01 | `Z..........4.M. [stage-3] 0010 | 00 | . [stage-3]  [stage-3] [Decoder] - .ResponseHeader @@ -426,7 +428,7 @@ Debug = true [stage-3] [Decoder] - .ResponseBody [stage-3] [Decoder]  - .throttle_time_ms (0) [stage-3] [Decoder]  - .error_code (0) -[stage-3] [Decoder]  - .session_id (61671890) +[stage-3] [Decoder]  - .session_id (137684301) [stage-3] [Decoder]  - .num_responses (0) [stage-3] [Decoder]  - .TAG_BUFFER [stage-3] ✓ Correlation ID: 1616512461 @@ -439,18 +441,19 @@ Debug = true [stage-2] Running tests for Stage #2: hn6 [stage-2] $ ./your_program.sh /tmp/server.properties [stage-2] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-2] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-2] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-2] Connecting to broker at: localhost:9092 [stage-2] Connection to broker at localhost:9092 successful @@ -461,7 +464,7 @@ Debug = true [stage-2] 0000 | 00 00 00 60 00 01 00 10 05 c3 b0 2a 00 09 6b 61 | ...`.......*..ka [stage-2] 0010 | 66 6b 61 2d 63 6c 69 00 00 00 01 f4 00 00 00 01 | fka-cli......... [stage-2] 0020 | 03 20 00 00 00 00 00 00 00 00 00 00 00 02 00 00 | . .............. -[stage-2] 0030 | 00 00 00 00 00 00 00 00 00 00 00 00 43 06 02 00 | ............C... +[stage-2] 0030 | 00 00 00 00 00 00 00 00 00 00 00 00 91 58 02 00 | .............X.. [stage-2] 0040 | 00 00 00 ff ff ff ff 00 00 00 00 00 00 00 00 ff | ................ [stage-2] 0050 | ff ff ff ff ff ff ff ff ff ff ff 00 10 00 00 00 | ................ [stage-2] 0060 | 00 01 01 00 | .... @@ -469,8 +472,8 @@ Debug = true [stage-2] Hexdump of received "Fetch" response:  [stage-2] Idx | Hex | ASCII [stage-2] -----+-------------------------------------------------+----------------- -[stage-2] 0000 | 05 c3 b0 2a 00 00 00 00 00 00 00 11 a7 e1 28 02 | ...*..........(. -[stage-2] 0010 | 00 00 00 00 00 00 00 00 00 00 00 00 00 00 43 06 | ..............C. +[stage-2] 0000 | 05 c3 b0 2a 00 00 00 00 00 00 00 0c 34 d2 e1 02 | ...*........4... +[stage-2] 0010 | 00 00 00 00 00 00 00 00 00 00 00 00 00 00 91 58 | ...............X [stage-2] 0020 | 02 00 00 00 00 00 64 ff ff ff ff ff ff ff ff ff | ......d......... [stage-2] 0030 | ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff 01 | ................ [stage-2] 0040 | ff ff ff ff 01 00 00 00 | ........ @@ -481,10 +484,10 @@ Debug = true [stage-2] [Decoder] - .ResponseBody [stage-2] [Decoder]  - .throttle_time_ms (0) [stage-2] [Decoder]  - .error_code (0) -[stage-2] [Decoder]  - .session_id (296214824) +[stage-2] [Decoder]  - .session_id (204788449) [stage-2] [Decoder]  - .num_responses (1) [stage-2] [Decoder]  - .TopicResponse[0] -[stage-2] [Decoder]  - .topic_id (00000000-0000-0000-0000-000000004306) +[stage-2] [Decoder]  - .topic_id (00000000-0000-0000-0000-000000009158) [stage-2] [Decoder]  - .num_partitions (1) [stage-2] [Decoder]  - .PartitionResponse[0] [stage-2] [Decoder]  - .partition_index (0) @@ -500,7 +503,7 @@ Debug = true [stage-2] [Decoder]  - .TAG_BUFFER [stage-2] ✓ Correlation ID: 96710698 [stage-2] ✓ Error code: 0 (NO_ERROR) -[stage-2] ✓ Topic UUID: 00000000-0000-0000-0000-000000004306 +[stage-2] ✓ Topic UUID: 00000000-0000-0000-0000-000000009158 [stage-2] ✓ PartitionResponse Error code: 100 (UNKNOWN_TOPIC_ID) [stage-2] ✓ RecordBatches: [] [stage-2] Test passed. @@ -510,18 +513,19 @@ Debug = true [stage-1] Running tests for Stage #1: cm4 [stage-1] $ ./your_program.sh /tmp/server.properties [stage-1] [Serializer] Writing log files to: /tmp/kraft-combined-logs -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/qux-1/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-1] [Serializer]  - Wrote file to: /tmp/server.properties +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-0/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/paz-1/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-1] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs [stage-1] Connecting to broker at: localhost:9092 [stage-1] Connection to broker at localhost:9092 successful @@ -532,7 +536,7 @@ Debug = true [stage-1] 0000 | 00 00 00 60 00 01 00 10 0f 2e 14 fa 00 09 6b 61 | ...`..........ka [stage-1] 0010 | 66 6b 61 2d 63 6c 69 00 00 00 01 f4 00 00 00 01 | fka-cli......... [stage-1] 0020 | 03 20 00 00 00 00 00 00 00 00 00 00 00 02 00 00 | . .............. -[stage-1] 0030 | 00 00 00 00 40 00 80 00 00 00 00 00 00 53 02 00 | ....@........S.. +[stage-1] 0030 | 00 00 00 00 40 00 80 00 00 00 00 00 00 19 02 00 | ....@........... [stage-1] 0040 | 00 00 00 ff ff ff ff 00 00 00 00 00 00 00 00 ff | ................ [stage-1] 0050 | ff ff ff ff ff ff ff ff ff ff ff 00 10 00 00 00 | ................ [stage-1] 0060 | 00 01 01 00 | .... @@ -540,16 +544,16 @@ Debug = true [stage-1] Hexdump of received "Fetch" response:  [stage-1] Idx | Hex | ASCII [stage-1] -----+-------------------------------------------------+----------------- -[stage-1] 0000 | 0f 2e 14 fa 00 00 00 00 00 00 00 2f f1 4b 1e 02 | .........../.K.. -[stage-1] 0010 | 00 00 00 00 00 00 40 00 80 00 00 00 00 00 00 53 | ......@........S +[stage-1] 0000 | 0f 2e 14 fa 00 00 00 00 00 00 00 01 a4 fa ac 02 | ................ +[stage-1] 0010 | 00 00 00 00 00 00 40 00 80 00 00 00 00 00 00 19 | ......@......... [stage-1] 0020 | 02 00 00 00 00 00 00 00 00 00 00 00 00 00 01 00 | ................ [stage-1] 0030 | 00 00 00 00 00 00 01 00 00 00 00 00 00 00 00 00 | ................ -[stage-1] 0040 | ff ff ff ff 51 00 00 00 00 00 00 00 00 00 00 00 | ....Q........... -[stage-1] 0050 | 44 00 00 00 00 02 98 ec 18 d3 00 00 00 00 00 00 | D............... +[stage-1] 0040 | ff ff ff ff 54 00 00 00 00 00 00 00 00 00 00 00 | ....T........... +[stage-1] 0050 | 47 00 00 00 00 02 b8 7c 9c ff 00 00 00 00 00 00 | G......|........ [stage-1] 0060 | 00 00 01 91 e0 5b 6d 8b 00 00 01 91 e0 5b 6d 8b | .....[m......[m. [stage-1] 0070 | 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 | ................ -[stage-1] 0080 | 00 01 24 00 00 00 01 18 48 65 6c 6c 6f 20 57 6f | ..$.....Hello Wo -[stage-1] 0090 | 72 6c 64 21 00 00 00 00 | rld!.... +[stage-1] 0080 | 00 01 2a 00 00 00 01 1e 48 65 6c 6c 6f 20 55 6e | ..*.....Hello Un +[stage-1] 0090 | 69 76 65 72 73 65 21 00 00 00 00 | iverse!.... [stage-1]  [stage-1] [Decoder] - .ResponseHeader [stage-1] [Decoder]  - .correlation_id (254678266) @@ -557,10 +561,10 @@ Debug = true [stage-1] [Decoder] - .ResponseBody [stage-1] [Decoder]  - .throttle_time_ms (0) [stage-1] [Decoder]  - .error_code (0) -[stage-1] [Decoder]  - .session_id (804342558) +[stage-1] [Decoder]  - .session_id (27589292) [stage-1] [Decoder]  - .num_responses (1) [stage-1] [Decoder]  - .TopicResponse[0] -[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000053) +[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000019) [stage-1] [Decoder]  - .num_partitions (1) [stage-1] [Decoder]  - .PartitionResponse[0] [stage-1] [Decoder]  - .partition_index (0) @@ -570,13 +574,13 @@ Debug = true [stage-1] [Decoder]  - .log_start_offset (0) [stage-1] [Decoder]  - .num_aborted_transactions (0) [stage-1] [Decoder]  - .preferred_read_replica (-1) -[stage-1] [Decoder]  - .compact_records_length (80) +[stage-1] [Decoder]  - .compact_records_length (83) [stage-1] [Decoder]  - .RecordBatch[0] [stage-1] [Decoder]  - .base_offset (0) -[stage-1] [Decoder]  - .batch_length (68) +[stage-1] [Decoder]  - .batch_length (71) [stage-1] [Decoder]  - .partition_leader_epoch (0) [stage-1] [Decoder]  - .magic_byte (2) -[stage-1] [Decoder]  - .crc (-1729357613) +[stage-1] [Decoder]  - .crc (-1199792897) [stage-1] [Decoder]  - .record_attributes (0) [stage-1] [Decoder]  - .last_offset_delta (0) [stage-1] [Decoder]  - .base_timestamp (1726045973899) @@ -586,21 +590,21 @@ Debug = true [stage-1] [Decoder]  - .base_sequence (0) [stage-1] [Decoder]  - .num_records (1) [stage-1] [Decoder]  - .Record[0] -[stage-1] [Decoder]  - .length (18) +[stage-1] [Decoder]  - .length (21) [stage-1] [Decoder]  - .attributes (0) [stage-1] [Decoder]  - .timestamp_delta (0) [stage-1] [Decoder]  - .offset_delta (0) [stage-1] [Decoder]  - .key_length (-1) [stage-1] [Decoder]  - .key ("") -[stage-1] [Decoder]  - .value_length (12) -[stage-1] [Decoder]  - .value ("Hello World!") +[stage-1] [Decoder]  - .value_length (15) +[stage-1] [Decoder]  - .value ("Hello Universe!") [stage-1] [Decoder]  - .num_headers (0) [stage-1] [Decoder]  - .TAG_BUFFER [stage-1] [Decoder]  - .TAG_BUFFER [stage-1] [Decoder]  - .TAG_BUFFER [stage-1] ✓ Correlation ID: 254678266 [stage-1] ✓ Error code: 0 (NO_ERROR) -[stage-1] ✓ Messages: ["Hello World!"] +[stage-1] ✓ Messages: ["Hello Universe!"] [stage-1] Test passed. [stage-1] Terminating program [stage-1] Program terminated successfully diff --git a/protocol/api/cluster_metadata_payloads.go b/protocol/api/cluster_metadata_payloads.go index 845c362..2d267d7 100644 --- a/protocol/api/cluster_metadata_payloads.go +++ b/protocol/api/cluster_metadata_payloads.go @@ -105,14 +105,14 @@ func (p *ClusterMetadataPayload) Decode(data []byte) (err error) { return err } - beginTransactionRecord.Name, err = partialDecoder.GetString() + beginTransactionRecord.Name, err = partialDecoder.GetCompactString() if err != nil { return err } } if partialDecoder.Remaining() > 0 { - return errors.NewPacketDecodingError(fmt.Sprintf("Remaining bytes after decoding: %d", partialDecoder.Remaining()), "PARTITION_CHANGE_RECORD") + return errors.NewPacketDecodingError(fmt.Sprintf("Remaining bytes after decoding: %d", partialDecoder.Remaining()), "BEGIN_TRANSACTION_RECORD") } case 21: zkMigrationStateRecord := &ZKMigrationStateRecord{} @@ -135,7 +135,7 @@ func (p *ClusterMetadataPayload) Decode(data []byte) (err error) { topicRecord := &TopicRecord{} p.Data = topicRecord - topicRecord.TopicName, err = partialDecoder.GetString() + topicRecord.TopicName, err = partialDecoder.GetCompactString() if err != nil { return err } @@ -151,13 +151,13 @@ func (p *ClusterMetadataPayload) Decode(data []byte) (err error) { } if partialDecoder.Remaining() > 0 { - return errors.NewPacketDecodingError(fmt.Sprintf("Remaining bytes after decoding: %d", partialDecoder.Remaining()), "PRODUCER_IDS_RECORD") + return errors.NewPacketDecodingError(fmt.Sprintf("Remaining bytes after decoding: %d", partialDecoder.Remaining()), "TOPIC_RECORD") } case 12: featureLevelRecord := &FeatureLevelRecord{} p.Data = featureLevelRecord - featureLevelRecord.Name, err = partialDecoder.GetString() + featureLevelRecord.Name, err = partialDecoder.GetCompactString() if err != nil { return err } @@ -185,7 +185,7 @@ func (p *ClusterMetadataPayload) Decode(data []byte) (err error) { } if partialDecoder.Remaining() > 0 { - return errors.NewPacketDecodingError(fmt.Sprintf("Remaining bytes after decoding: %d", partialDecoder.Remaining()), "FEATURE_LEVEL_RECORD") + return errors.NewPacketDecodingError(fmt.Sprintf("Remaining bytes after decoding: %d", partialDecoder.Remaining()), "END_TRANSACTION_RECORD") } case 3: @@ -202,22 +202,22 @@ func (p *ClusterMetadataPayload) Decode(data []byte) (err error) { return err } - partitionRecord.Replicas, err = partialDecoder.GetInt32Array() + partitionRecord.Replicas, err = partialDecoder.GetCompactInt32Array() if err != nil { return err } - partitionRecord.ISReplicas, err = partialDecoder.GetInt32Array() + partitionRecord.ISReplicas, err = partialDecoder.GetCompactInt32Array() if err != nil { return err } - partitionRecord.RemovingReplicas, err = partialDecoder.GetInt32Array() + partitionRecord.RemovingReplicas, err = partialDecoder.GetCompactInt32Array() if err != nil { return err } - partitionRecord.AddingReplicas, err = partialDecoder.GetInt32Array() + partitionRecord.AddingReplicas, err = partialDecoder.GetCompactInt32Array() if err != nil { return err } @@ -238,10 +238,18 @@ func (p *ClusterMetadataPayload) Decode(data []byte) (err error) { } if p.Version >= 1 { - partitionRecord.Directories, err = partialDecoder.GetStringArray() + arrayLength, err := partialDecoder.GetCompactArrayLength() if err != nil { return err } + + partitionRecord.Directories = make([]string, arrayLength) + for i := range partitionRecord.Directories { + partitionRecord.Directories[i], err = getUUID(&partialDecoder) + if err != nil { + return err + } + } } _, err = partialDecoder.GetUnsignedVarint() // taggedFieldCount @@ -250,7 +258,7 @@ func (p *ClusterMetadataPayload) Decode(data []byte) (err error) { } if partialDecoder.Remaining() > 0 { - return errors.NewPacketDecodingError(fmt.Sprintf("Remaining bytes after decoding: %d", partialDecoder.Remaining()), "FEATURE_LEVEL_RECORD") + return errors.NewPacketDecodingError(fmt.Sprintf("Remaining bytes after decoding: %d", partialDecoder.Remaining()), "PARTITION_RECORD") } } diff --git a/protocol/api/describe_topic_partitions_response.go b/protocol/api/describe_topic_partitions_response.go index 602bd37..2c21182 100644 --- a/protocol/api/describe_topic_partitions_response.go +++ b/protocol/api/describe_topic_partitions_response.go @@ -50,6 +50,7 @@ func (a *DescribeTopicPartitionsResponse) Decode(pd *decoder.RealDecoder, logger } a.NextCursor = DescribeTopicPartitionsResponseCursor{} + protocol.LogWithIndentation(logger, indentation, "- .next_cursor") err = a.NextCursor.Decode(pd, logger, indentation+1) if err != nil { if decodingErr, ok := err.(*errors.PacketDecodingError); ok { @@ -57,7 +58,6 @@ func (a *DescribeTopicPartitionsResponse) Decode(pd *decoder.RealDecoder, logger } return err } - protocol.LogWithIndentation(logger, indentation, "- .next_cursor (%v)", a.NextCursor) if _, err := pd.GetEmptyTaggedFieldArray(); err != nil { if decodingErr, ok := err.(*errors.PacketDecodingError); ok { @@ -287,11 +287,13 @@ func (c *DescribeTopicPartitionsResponseCursor) Decode(pd *decoder.RealDecoder, // This field is nullable, the first byte indicates whether it's null or not checkPresence, err := pd.GetInt8() if err != nil { - if decodingErr, ok := err.(*errors.PacketDecodingError); ok { - return decodingErr.WithAddedContext("cursor_type") + if _, ok := err.(*errors.PacketDecodingError); ok { + customError := errors.NewPacketDecodingError(fmt.Sprintf("Expected either 0xFF (cursor is null) or 0x01 (cursor is not null), got 0x%02X", uint8(checkPresence)), "RAW_BYTES") + return customError.WithAddedContext("cursor_is_null") } return err } + protocol.LogWithIndentation(logger, indentation, "- .cursor_is_null (0x%02X)", uint8(checkPresence)) if checkPresence == -1 { c = nil diff --git a/protocol/serializer/cluster_metadata.go b/protocol/serializer/cluster_metadata.go index 31d2b28..e86c015 100644 --- a/protocol/serializer/cluster_metadata.go +++ b/protocol/serializer/cluster_metadata.go @@ -248,6 +248,6 @@ func writeClusterMetadata(path string, topic1Name string, topic1UUID string, top return fmt.Errorf("error writing file to %s: %w", path, err) } - logger.Debugf(" - Wrote file to: %s", path) + logger.Debugf(" - Wrote file to: %s", path) return nil } diff --git a/protocol/serializer/generate_log_dir.go b/protocol/serializer/generate_log_dir.go index 890ef06..fcb5006 100644 --- a/protocol/serializer/generate_log_dir.go +++ b/protocol/serializer/generate_log_dir.go @@ -49,6 +49,7 @@ func GenerateLogDirs(logger *logger.Logger) error { clusterMetadataDirectory := fmt.Sprintf("%s/__cluster_metadata-0", basePath) kraftServerPropertiesPath := fmt.Sprintf(common.SERVER_PROPERTIES_FILE_PATH) + kafkaCleanShutdownPath := fmt.Sprintf("%s/.kafka_cleanshutdown", basePath) metaPropertiesPath := fmt.Sprintf("%s/meta.properties", basePath) topic1MetadataPath := fmt.Sprintf("%s/partition.metadata", topic1MetadataDirectory) topic2MetadataPath := fmt.Sprintf("%s/partition.metadata", topic2MetadataDirectory) @@ -69,11 +70,21 @@ func GenerateLogDirs(logger *logger.Logger) error { logger.UpdateSecondaryPrefix("Serializer") logger.Debugf("Writing log files to: %s", basePath) + err = writeKraftServerProperties(kraftServerPropertiesPath, logger) + if err != nil { + return err + } + err = writeMetaProperties(metaPropertiesPath, clusterID, directoryID, nodeID, version, logger) if err != nil { return err } + err = writeKafkaCleanShutdown(kafkaCleanShutdownPath, logger) + if err != nil { + return err + } + err = writePartitionMetadata(topic1MetadataPath, 0, topic1ID, logger) if err != nil { return err @@ -124,11 +135,6 @@ func GenerateLogDirs(logger *logger.Logger) error { return err } - err = writeKraftServerProperties(kraftServerPropertiesPath, logger) - if err != nil { - return err - } - logger.Infof("Finished writing log files to: %s", basePath) logger.ResetSecondaryPrefix() diff --git a/protocol/serializer/meta_properties.go b/protocol/serializer/meta_properties.go index ded6749..061b2dd 100644 --- a/protocol/serializer/meta_properties.go +++ b/protocol/serializer/meta_properties.go @@ -24,7 +24,7 @@ log.dirs=/tmp/kraft-combined-logs` return fmt.Errorf("error writing file to %s: %w", path, err) } - logger.Debugf(" - Wrote file to: %s", path) + logger.Debugf(" - Wrote file to: %s", path) return nil } @@ -37,6 +37,19 @@ func writeMetaProperties(path, clusterID, directoryID string, nodeID, version in return fmt.Errorf("error writing meta properties file: %w", err) } - logger.Debugf(" - Wrote file to: %s", path) + logger.Debugf(" - Wrote file to: %s", path) + return nil +} + +// writeKafkaCleanShutdown writes the hard-coded .kafka_cleanshutdown content to path +func writeKafkaCleanShutdown(path string, logger *logger.Logger) error { + kafkaCleanShutdown := `{"version":0,"brokerEpoch":10}` + + err := os.WriteFile(path, []byte(kafkaCleanShutdown), 0644) + if err != nil { + return fmt.Errorf("error writing file to %s: %w", path, err) + } + + logger.Debugf(" - Wrote file to: %s", path) return nil } diff --git a/protocol/serializer/topic_data.go b/protocol/serializer/topic_data.go index 621aee1..6592102 100644 --- a/protocol/serializer/topic_data.go +++ b/protocol/serializer/topic_data.go @@ -48,7 +48,7 @@ func writeTopicData(path string, messages []string, logger *logger.Logger) error return fmt.Errorf("error writing file to %s: %w", path, err) } - logger.Debugf(" - Wrote file to: %s", path) + logger.Debugf(" - Wrote file to: %s", path) return nil } @@ -60,6 +60,6 @@ func writePartitionMetadata(path string, version int, topicID string, logger *lo return fmt.Errorf("error writing partition metadata file: %w", err) } - logger.Debugf(" - Wrote file to: %s", path) + logger.Debugf(" - Wrote file to: %s", path) return nil }