diff --git a/internal/stage1.go b/internal/stage1.go index f4d1d66..23ce88a 100644 --- a/internal/stage1.go +++ b/internal/stage1.go @@ -10,14 +10,13 @@ import ( func testBindToPort(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true) + if err != nil { return err } - quietLogger := logger.GetQuietLogger("") - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(quietLogger, true) - if err != nil { + stageLogger := stageHarness.Logger + if err := b.Run(); err != nil { return err } @@ -26,5 +25,5 @@ func testBindToPort(stageHarness *test_case_harness.TestCaseHarness) error { Retries: 15, } - return bindTestCase.Run(b, logger) + return bindTestCase.Run(b, stageLogger) } diff --git a/internal/stage2.go b/internal/stage2.go index 537992b..284ddea 100644 --- a/internal/stage2.go +++ b/internal/stage2.go @@ -16,22 +16,23 @@ import ( func testHardcodedCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true) + if err != nil { return err } - quietLogger := logger.GetQuietLogger("") - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(quietLogger, true) - if err != nil { + stageLogger := stageHarness.Logger + if err := b.Run(); err != nil { return err } broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) correlationId := 7 @@ -50,8 +51,8 @@ func testHardcodedCorrelationId(stageHarness *test_case_harness.TestCaseHarness) } message := kafkaapi.EncodeApiVersionsRequest(&request) - logger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) err = broker.Send(message) if err != nil { @@ -61,13 +62,13 @@ func testHardcodedCorrelationId(stageHarness *test_case_harness.TestCaseHarness) if err != nil { return err } - logger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response)) + stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response)) decoder := realdecoder.RealDecoder{} decoder.Init(response) - logger.UpdateSecondaryPrefix("Decoder") + stageLogger.UpdateSecondaryPrefix("Decoder") - logger.Debugf("- .Response") + stageLogger.Debugf("- .Response") messageLength, err := decoder.GetInt32() if err != nil { if decodingErr, ok := err.(*errors.PacketDecodingError); ok { @@ -76,9 +77,9 @@ func testHardcodedCorrelationId(stageHarness *test_case_harness.TestCaseHarness) } return err } - protocol.LogWithIndentation(logger, 1, "- .message_length (%d)", messageLength) + protocol.LogWithIndentation(stageLogger, 1, "- .message_length (%d)", messageLength) - logger.Debugf("- .ResponseHeader") + stageLogger.Debugf("- .ResponseHeader") responseCorrelationId, err := decoder.GetInt32() if err != nil { if decodingErr, ok := err.(*errors.PacketDecodingError); ok { @@ -87,14 +88,14 @@ func testHardcodedCorrelationId(stageHarness *test_case_harness.TestCaseHarness) } return err } - protocol.LogWithIndentation(logger, 1, "- .correlation_id (%d)", responseCorrelationId) - logger.ResetSecondaryPrefix() + protocol.LogWithIndentation(stageLogger, 1, "- .correlation_id (%d)", responseCorrelationId) + stageLogger.ResetSecondaryPrefix() if responseCorrelationId != int32(correlationId) { return fmt.Errorf("Expected Correlation ID to be %v, got %v", int32(correlationId), responseCorrelationId) } - logger.Successf("✓ Correlation ID: %v", responseCorrelationId) + stageLogger.Successf("✓ Correlation ID: %v", responseCorrelationId) return nil } diff --git a/internal/stage3.go b/internal/stage3.go index 1bc3fbe..eec9aab 100644 --- a/internal/stage3.go +++ b/internal/stage3.go @@ -2,6 +2,7 @@ package internal import ( "fmt" + "github.com/codecrafters-io/tester-utils/logger" realdecoder "github.com/codecrafters-io/kafka-tester/protocol/decoder" @@ -10,28 +11,28 @@ import ( kafkaapi "github.com/codecrafters-io/kafka-tester/protocol/api" "github.com/codecrafters-io/kafka-tester/protocol/errors" "github.com/codecrafters-io/kafka-tester/protocol/serializer" - "github.com/codecrafters-io/tester-utils/logger" "github.com/codecrafters-io/tester-utils/test_case_harness" ) func testCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true) + if err != nil { return err } - quietLogger := logger.GetQuietLogger("") - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(quietLogger, true) - if err != nil { + stageLogger := stageHarness.Logger + if err := b.Run(); err != nil { return err } broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) correlationId := getRandomCorrelationId() @@ -50,8 +51,8 @@ func testCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error { } message := kafkaapi.EncodeApiVersionsRequest(&request) - logger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) err = broker.Send(message) if err != nil { @@ -61,13 +62,13 @@ func testCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error { if err != nil { return err } - logger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response)) + stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response)) decoder := realdecoder.RealDecoder{} decoder.Init(response) - logger.UpdateSecondaryPrefix("Decoder") + stageLogger.UpdateSecondaryPrefix("Decoder") - logger.Debugf("- .Response") + stageLogger.Debugf("- .Response") messageLength, err := decoder.GetInt32() if err != nil { if decodingErr, ok := err.(*errors.PacketDecodingError); ok { @@ -76,9 +77,9 @@ func testCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error { } return err } - protocol.LogWithIndentation(logger, 1, "- .message_length (%d)", messageLength) + protocol.LogWithIndentation(stageLogger, 1, "- .message_length (%d)", messageLength) - logger.Debugf("- .ResponseHeader") + stageLogger.Debugf("- .ResponseHeader") responseCorrelationId, err := decoder.GetInt32() if err != nil { if decodingErr, ok := err.(*errors.PacketDecodingError); ok { @@ -87,14 +88,14 @@ func testCorrelationId(stageHarness *test_case_harness.TestCaseHarness) error { } return err } - protocol.LogWithIndentation(logger, 1, "- .correlation_id (%d)", responseCorrelationId) - logger.ResetSecondaryPrefix() + protocol.LogWithIndentation(stageLogger, 1, "- .correlation_id (%d)", responseCorrelationId) + stageLogger.ResetSecondaryPrefix() if responseCorrelationId != correlationId { return fmt.Errorf("Expected Correlation ID to be %v, got %v", correlationId, responseCorrelationId) } - logger.Successf("✓ Correlation ID: %v", responseCorrelationId) + stageLogger.Successf("✓ Correlation ID: %v", responseCorrelationId) return nil } diff --git a/internal/stage4.go b/internal/stage4.go index 7d343d1..624a82a 100644 --- a/internal/stage4.go +++ b/internal/stage4.go @@ -2,6 +2,7 @@ package internal import ( "fmt" + "github.com/codecrafters-io/tester-utils/logger" "github.com/codecrafters-io/kafka-tester/internal/kafka_executable" "github.com/codecrafters-io/kafka-tester/protocol" @@ -9,20 +10,18 @@ import ( realdecoder "github.com/codecrafters-io/kafka-tester/protocol/decoder" "github.com/codecrafters-io/kafka-tester/protocol/errors" "github.com/codecrafters-io/kafka-tester/protocol/serializer" - "github.com/codecrafters-io/tester-utils/logger" "github.com/codecrafters-io/tester-utils/test_case_harness" ) func testAPIVersionErrorCase(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true) + if err != nil { return err } - quietLogger := logger.GetQuietLogger("") - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(quietLogger, true) - if err != nil { + stageLogger := stageHarness.Logger + if err := b.Run(); err != nil { return err } @@ -30,10 +29,12 @@ func testAPIVersionErrorCase(stageHarness *test_case_harness.TestCaseHarness) er apiVersion := getInvalidAPIVersion() broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.ApiVersionsRequest{ Header: kafkaapi.RequestHeader{ @@ -50,8 +51,8 @@ func testAPIVersionErrorCase(stageHarness *test_case_harness.TestCaseHarness) er } message := kafkaapi.EncodeApiVersionsRequest(&request) - logger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) err = broker.Send(message) if err != nil { @@ -61,13 +62,13 @@ func testAPIVersionErrorCase(stageHarness *test_case_harness.TestCaseHarness) er if err != nil { return err } - logger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response)) + stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response)) decoder := realdecoder.RealDecoder{} decoder.Init(response) - logger.UpdateSecondaryPrefix("Decoder") + stageLogger.UpdateSecondaryPrefix("Decoder") - logger.Debugf("- .Response") + stageLogger.Debugf("- .Response") messageLength, err := decoder.GetInt32() if err != nil { if decodingErr, ok := err.(*errors.PacketDecodingError); ok { @@ -76,9 +77,9 @@ func testAPIVersionErrorCase(stageHarness *test_case_harness.TestCaseHarness) er } return err } - protocol.LogWithIndentation(logger, 1, "- .message_length (%d)", messageLength) + protocol.LogWithIndentation(stageLogger, 1, "- .message_length (%d)", messageLength) - logger.Debugf("- .ResponseHeader") + stageLogger.Debugf("- .ResponseHeader") responseCorrelationId, err := decoder.GetInt32() if err != nil { if decodingErr, ok := err.(*errors.PacketDecodingError); ok { @@ -87,7 +88,7 @@ func testAPIVersionErrorCase(stageHarness *test_case_harness.TestCaseHarness) er } return err } - protocol.LogWithIndentation(logger, 1, "- .correlation_id (%d)", responseCorrelationId) + protocol.LogWithIndentation(stageLogger, 1, "- .correlation_id (%d)", responseCorrelationId) errorCode, err := decoder.GetInt16() if err != nil { @@ -97,20 +98,20 @@ func testAPIVersionErrorCase(stageHarness *test_case_harness.TestCaseHarness) er } return err } - protocol.LogWithIndentation(logger, 1, "- .error_code (%d)", errorCode) - logger.ResetSecondaryPrefix() + protocol.LogWithIndentation(stageLogger, 1, "- .error_code (%d)", errorCode) + stageLogger.ResetSecondaryPrefix() if responseCorrelationId != correlationId { return fmt.Errorf("Expected Correlation ID to be %v, got %v", correlationId, responseCorrelationId) } - logger.Successf("✓ Correlation ID: %v", responseCorrelationId) + stageLogger.Successf("✓ Correlation ID: %v", responseCorrelationId) if errorCode != 35 { return fmt.Errorf("Expected Error code to be 35, got %v", errorCode) } - logger.Successf("✓ Error code: 35 (UNSUPPORTED_VERSION)") + stageLogger.Successf("✓ Error code: 35 (UNSUPPORTED_VERSION)") return nil } diff --git a/internal/stage5.go b/internal/stage5.go index 198f1b1..2b73f2c 100644 --- a/internal/stage5.go +++ b/internal/stage5.go @@ -13,24 +13,25 @@ import ( func testAPIVersion(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true) + if err != nil { return err } - quietLogger := logger.GetQuietLogger("") - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(quietLogger, true) - if err != nil { + stageLogger := stageHarness.Logger + if err := b.Run(); err != nil { return err } correlationId := getRandomCorrelationId() broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.ApiVersionsRequest{ Header: kafkaapi.RequestHeader{ @@ -47,16 +48,16 @@ func testAPIVersion(stageHarness *test_case_harness.TestCaseHarness) error { } message := kafkaapi.EncodeApiVersionsRequest(&request) - logger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) response, err := broker.SendAndReceive(message) if err != nil { return err } - logger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) + stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) - responseHeader, responseBody, err := kafkaapi.DecodeApiVersionsHeaderAndResponse(response.Payload, 3, logger) + responseHeader, responseBody, err := kafkaapi.DecodeApiVersionsHeaderAndResponse(response.Payload, 3, stageLogger) if err != nil { return err } @@ -64,17 +65,17 @@ func testAPIVersion(stageHarness *test_case_harness.TestCaseHarness) error { if responseHeader.CorrelationId != correlationId { return fmt.Errorf("Expected Correlation ID to be %v, got %v", correlationId, responseHeader.CorrelationId) } - logger.Successf("✓ Correlation ID: %v", responseHeader.CorrelationId) + stageLogger.Successf("✓ Correlation ID: %v", responseHeader.CorrelationId) if responseBody.ErrorCode != 0 { return fmt.Errorf("Expected Error code to be 0, got %v", responseBody.ErrorCode) } - logger.Successf("✓ Error code: 0 (NO_ERROR)") + stageLogger.Successf("✓ Error code: 0 (NO_ERROR)") if len(responseBody.ApiKeys) < 1 { return fmt.Errorf("Expected API keys array to be non-empty") } - logger.Successf("✓ API keys array is non-empty") + stageLogger.Successf("✓ API keys array is non-empty") foundAPIKey := false MAX_VERSION := int16(4) @@ -82,7 +83,7 @@ func testAPIVersion(stageHarness *test_case_harness.TestCaseHarness) error { if apiVersionKey.ApiKey == 18 { foundAPIKey = true if apiVersionKey.MaxVersion >= MAX_VERSION { - logger.Successf("✓ API version %v is supported for API_VERSIONS", MAX_VERSION) + stageLogger.Successf("✓ API version %v is supported for API_VERSIONS", MAX_VERSION) } else { return fmt.Errorf("Expected API version %v to be supported for API_VERSIONS, got %v", MAX_VERSION, apiVersionKey.MaxVersion) } diff --git a/internal/stagec1.go b/internal/stagec1.go index 059ff01..2565ea5 100644 --- a/internal/stagec1.go +++ b/internal/stagec1.go @@ -2,34 +2,35 @@ package internal import ( "fmt" + "github.com/codecrafters-io/tester-utils/logger" "github.com/codecrafters-io/kafka-tester/internal/kafka_executable" "github.com/codecrafters-io/kafka-tester/protocol" kafkaapi "github.com/codecrafters-io/kafka-tester/protocol/api" "github.com/codecrafters-io/kafka-tester/protocol/serializer" - "github.com/codecrafters-io/tester-utils/logger" "github.com/codecrafters-io/tester-utils/random" "github.com/codecrafters-io/tester-utils/test_case_harness" ) func testSequentialRequests(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true) + if err != nil { return err } - quietLogger := logger.GetQuietLogger("") - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(quietLogger, true) - if err != nil { + stageLogger := stageHarness.Logger + if err := b.Run(); err != nil { return err } broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) requestCount := random.RandomInt(2, 5) for i := 0; i < requestCount; i++ { @@ -49,16 +50,16 @@ func testSequentialRequests(stageHarness *test_case_harness.TestCaseHarness) err } message := kafkaapi.EncodeApiVersionsRequest(&request) - logger.Infof("Sending request %v of %v: \"ApiVersions\" (version: %v) request (Correlation id: %v)", i+1, requestCount, request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending request %v of %v: \"ApiVersions\" (version: %v) request (Correlation id: %v)", i+1, requestCount, request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) response, err := broker.SendAndReceive(message) if err != nil { return err } - logger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) + stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) - responseHeader, responseBody, err := kafkaapi.DecodeApiVersionsHeaderAndResponse(response.Payload, 3, logger) + responseHeader, responseBody, err := kafkaapi.DecodeApiVersionsHeaderAndResponse(response.Payload, 3, stageLogger) if err != nil { return err } @@ -66,17 +67,17 @@ func testSequentialRequests(stageHarness *test_case_harness.TestCaseHarness) err if responseHeader.CorrelationId != correlationId { return fmt.Errorf("Expected Correlation ID to be %v, got %v", correlationId, responseHeader.CorrelationId) } - logger.Successf("✓ Correlation ID: %v", responseHeader.CorrelationId) + stageLogger.Successf("✓ Correlation ID: %v", responseHeader.CorrelationId) if responseBody.ErrorCode != 0 { return fmt.Errorf("Expected Error code to be 0, got %v", responseBody.ErrorCode) } - logger.Successf("✓ Error code: 0 (NO_ERROR)") + stageLogger.Successf("✓ Error code: 0 (NO_ERROR)") if len(responseBody.ApiKeys) < 1 { return fmt.Errorf("Expected API keys array to include atleast 1 key (API_VERSIONS), got %v", len(responseBody.ApiKeys)) } - logger.Successf("✓ API keys array is non-empty") + stageLogger.Successf("✓ API keys array is non-empty") foundAPIKey := false MAX_VERSION_APIVERSION := int16(4) @@ -84,7 +85,7 @@ func testSequentialRequests(stageHarness *test_case_harness.TestCaseHarness) err if apiVersionKey.ApiKey == 18 { foundAPIKey = true if apiVersionKey.MaxVersion >= MAX_VERSION_APIVERSION { - logger.Successf("✓ API version %v is supported for API_VERSIONS", MAX_VERSION_APIVERSION) + stageLogger.Successf("✓ API version %v is supported for API_VERSIONS", MAX_VERSION_APIVERSION) } else { return fmt.Errorf("Expected API version %v to be supported for API_VERSIONS, got %v", MAX_VERSION_APIVERSION, apiVersionKey.MaxVersion) } @@ -95,7 +96,7 @@ func testSequentialRequests(stageHarness *test_case_harness.TestCaseHarness) err return fmt.Errorf("Expected APIVersionsResponseKey to be present for API key 18 (API_VERSIONS)") } - logger.Successf("✓ Test %v of %v: Passed", i+1, requestCount) + stageLogger.Successf("✓ Test %v of %v: Passed", i+1, requestCount) } return nil diff --git a/internal/stagec2.go b/internal/stagec2.go index 51b9f96..16c9759 100644 --- a/internal/stagec2.go +++ b/internal/stagec2.go @@ -15,14 +15,13 @@ import ( func testConcurrentRequests(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true) + if err != nil { return err } - quietLogger := logger.GetQuietLogger("") - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(quietLogger, true) - if err != nil { + stageLogger := stageHarness.Logger + if err := b.Run(); err != nil { return err } @@ -32,11 +31,19 @@ func testConcurrentRequests(stageHarness *test_case_harness.TestCaseHarness) err for i := 0; i < clientCount; i++ { clients[i] = protocol.NewBroker("localhost:9092") - if err := clients[i].ConnectWithRetries(b, logger); err != nil { + if err := clients[i].ConnectWithRetries(b, stageLogger); err != nil { return err } } + defer func() { + for _, client := range clients { + if client != nil { + _ = client.Close() + } + } + }() + for i, client := range clients { correlationIds[i] = int32(random.RandomInt(-math.MaxInt32, math.MaxInt32)) request := kafkaapi.ApiVersionsRequest{ @@ -54,8 +61,8 @@ func testConcurrentRequests(stageHarness *test_case_harness.TestCaseHarness) err } message := kafkaapi.EncodeApiVersionsRequest(&request) - logger.Infof("Sending request %v of %v: \"ApiVersions\" (version: %v) request (Correlation id: %v)", i+1, clientCount, request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending request %v of %v: \"ApiVersions\" (version: %v) request (Correlation id: %v)", i+1, clientCount, request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) err := client.Send(message) if err != nil { @@ -71,9 +78,9 @@ func testConcurrentRequests(stageHarness *test_case_harness.TestCaseHarness) err if err != nil { return err } - logger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) + stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) - responseHeader, responseBody, err := kafkaapi.DecodeApiVersionsHeaderAndResponse(response.Payload, 3, logger) + responseHeader, responseBody, err := kafkaapi.DecodeApiVersionsHeaderAndResponse(response.Payload, 3, stageLogger) if err != nil { return err } @@ -81,17 +88,17 @@ func testConcurrentRequests(stageHarness *test_case_harness.TestCaseHarness) err if responseHeader.CorrelationId != correlationIds[j] { return fmt.Errorf("Expected Correlation ID to be %v, got %v", correlationIds[j], responseHeader.CorrelationId) } - logger.Successf("✓ Correlation ID: %v", responseHeader.CorrelationId) + stageLogger.Successf("✓ Correlation ID: %v", responseHeader.CorrelationId) if responseBody.ErrorCode != 0 { return fmt.Errorf("Expected Error code to be 0, got %v", responseBody.ErrorCode) } - logger.Successf("✓ Error code: 0 (NO_ERROR)") + stageLogger.Successf("✓ Error code: 0 (NO_ERROR)") if len(responseBody.ApiKeys) < 1 { return fmt.Errorf("Expected API keys array to include atleast 1 key (API_VERSIONS), got %v", len(responseBody.ApiKeys)) } - logger.Successf("✓ API keys array is non-empty") + stageLogger.Successf("✓ API keys array is non-empty") foundAPIKey := false MAX_VERSION_APIVERSION := int16(4) @@ -99,7 +106,7 @@ func testConcurrentRequests(stageHarness *test_case_harness.TestCaseHarness) err if apiVersionKey.ApiKey == 18 { foundAPIKey = true if apiVersionKey.MaxVersion >= MAX_VERSION_APIVERSION { - logger.Successf("✓ API version %v is supported for API_VERSIONS", MAX_VERSION_APIVERSION) + stageLogger.Successf("✓ API version %v is supported for API_VERSIONS", MAX_VERSION_APIVERSION) } else { return fmt.Errorf("Expected API version %v to be supported for API_VERSIONS, got %v", MAX_VERSION_APIVERSION, apiVersionKey.MaxVersion) } @@ -110,11 +117,7 @@ func testConcurrentRequests(stageHarness *test_case_harness.TestCaseHarness) err return fmt.Errorf("Expected APIVersionsResponseKey to be present for API key 18 (API_VERSIONS)") } - logger.Successf("✓ Test %v of %v: Passed", j+1, clientCount) - } - - for _, client := range clients { - client.Close() + stageLogger.Successf("✓ Test %v of %v: Passed", j+1, clientCount) } return nil diff --git a/internal/stagef1.go b/internal/stagef1.go index 2f52784..61bc55f 100644 --- a/internal/stagef1.go +++ b/internal/stagef1.go @@ -10,26 +10,27 @@ import ( "github.com/codecrafters-io/tester-utils/test_case_harness" ) -func testAPIVersionwFetchKey(stageHarness *test_case_harness.TestCaseHarness) error { +func testAPIVersionWithFetchKey(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), false) + if err != nil { return err } - quietLogger := logger.GetQuietLogger("") - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(quietLogger, false) - if err != nil { + stageLogger := stageHarness.Logger + if err := b.Run(); err != nil { return err } correlationId := getRandomCorrelationId() broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.ApiVersionsRequest{ Header: kafkaapi.RequestHeader{ @@ -46,16 +47,16 @@ func testAPIVersionwFetchKey(stageHarness *test_case_harness.TestCaseHarness) er } message := kafkaapi.EncodeApiVersionsRequest(&request) - logger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) response, err := broker.SendAndReceive(message) if err != nil { return err } - logger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) + stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) - responseHeader, responseBody, err := kafkaapi.DecodeApiVersionsHeaderAndResponse(response.Payload, 3, logger) + responseHeader, responseBody, err := kafkaapi.DecodeApiVersionsHeaderAndResponse(response.Payload, 3, stageLogger) if err != nil { return err } @@ -63,7 +64,7 @@ func testAPIVersionwFetchKey(stageHarness *test_case_harness.TestCaseHarness) er expectedResponseHeader := kafkaapi.ResponseHeader{ CorrelationId: correlationId, } - if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, logger); err != nil { + if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, stageLogger); err != nil { return err } @@ -84,7 +85,7 @@ func testAPIVersionwFetchKey(stageHarness *test_case_harness.TestCaseHarness) er }, } - if err = assertions.NewApiVersionsResponseAssertion(*responseBody, expectedApiVersionResponse).Evaluate([]string{"ErrorCode"}, true, logger); err != nil { + if err = assertions.NewApiVersionsResponseAssertion(*responseBody, expectedApiVersionResponse).Evaluate([]string{"ErrorCode"}, true, stageLogger); err != nil { return err } diff --git a/internal/stagef2.go b/internal/stagef2.go index 5f0786f..280cb95 100644 --- a/internal/stagef2.go +++ b/internal/stagef2.go @@ -11,23 +11,24 @@ import ( func testFetchWithNoTopics(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + stageLogger := stageHarness.Logger + err := serializer.GenerateLogDirs(stageLogger, false) + if err != nil { return err } - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(logger, false) - if err != nil { + if err := b.Run(); err != nil { return err } correlationId := getRandomCorrelationId() - broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.FetchRequest{ Header: kafkaapi.RequestHeader{ @@ -50,16 +51,16 @@ func testFetchWithNoTopics(stageHarness *test_case_harness.TestCaseHarness) erro } message := kafkaapi.EncodeFetchRequest(&request) - logger.Infof("Sending \"Fetch\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"Fetch\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending \"Fetch\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"Fetch\" request: \n%v\n", GetFormattedHexdump(message)) response, err := broker.SendAndReceive(message) if err != nil { return err } - logger.Debugf("Hexdump of received \"Fetch\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) + stageLogger.Debugf("Hexdump of received \"Fetch\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) - responseHeader, responseBody, err := kafkaapi.DecodeFetchHeaderAndResponse(response.Payload, 16, logger) + responseHeader, responseBody, err := kafkaapi.DecodeFetchHeaderAndResponse(response.Payload, 16, stageLogger) if err != nil { return err } @@ -67,7 +68,7 @@ func testFetchWithNoTopics(stageHarness *test_case_harness.TestCaseHarness) erro expectedResponseHeader := kafkaapi.ResponseHeader{ CorrelationId: correlationId, } - if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, logger); err != nil { + if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, stageLogger); err != nil { return err } @@ -77,7 +78,7 @@ func testFetchWithNoTopics(stageHarness *test_case_harness.TestCaseHarness) erro SessionID: 0, TopicResponses: []kafkaapi.TopicResponse{}, } - return assertions.NewFetchResponseAssertion(*responseBody, expectedFetchResponse, logger). + return assertions.NewFetchResponseAssertion(*responseBody, expectedFetchResponse, stageLogger). AssertBody([]string{"ThrottleTimeMs", "ErrorCode"}). AssertNoTopics(). Run() diff --git a/internal/stagef3.go b/internal/stagef3.go index f60c856..cb77abf 100644 --- a/internal/stagef3.go +++ b/internal/stagef3.go @@ -10,15 +10,15 @@ import ( "github.com/codecrafters-io/tester-utils/test_case_harness" ) -func testFetchWithUnkownTopicID(stageHarness *test_case_harness.TestCaseHarness) error { +func testFetchWithUnknownTopicID(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + stageLogger := stageHarness.Logger + err := serializer.GenerateLogDirs(stageLogger, false) + if err != nil { return err } - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(logger, false) - if err != nil { + if err := b.Run(); err != nil { return err } @@ -27,10 +27,12 @@ func testFetchWithUnkownTopicID(stageHarness *test_case_harness.TestCaseHarness) // ToDo: Research on what is NULL v Empty arrays broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.FetchRequest{ Header: kafkaapi.RequestHeader{ @@ -67,16 +69,16 @@ func testFetchWithUnkownTopicID(stageHarness *test_case_harness.TestCaseHarness) } message := kafkaapi.EncodeFetchRequest(&request) - logger.Infof("Sending \"Fetch\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"Fetch\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending \"Fetch\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"Fetch\" request: \n%v\n", GetFormattedHexdump(message)) response, err := broker.SendAndReceive(message) if err != nil { return err } - logger.Debugf("Hexdump of received \"Fetch\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) + stageLogger.Debugf("Hexdump of received \"Fetch\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) - responseHeader, responseBody, err := kafkaapi.DecodeFetchHeaderAndResponse(response.Payload, 16, logger) + responseHeader, responseBody, err := kafkaapi.DecodeFetchHeaderAndResponse(response.Payload, 16, stageLogger) if err != nil { return err } @@ -84,7 +86,7 @@ func testFetchWithUnkownTopicID(stageHarness *test_case_harness.TestCaseHarness) expectedResponseHeader := kafkaapi.ResponseHeader{ CorrelationId: correlationId, } - if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, logger); err != nil { + if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, stageLogger); err != nil { return err } @@ -105,7 +107,7 @@ func testFetchWithUnkownTopicID(stageHarness *test_case_harness.TestCaseHarness) }, } - return assertions.NewFetchResponseAssertion(*responseBody, expectedFetchResponse, logger). + return assertions.NewFetchResponseAssertion(*responseBody, expectedFetchResponse, stageLogger). AssertBody([]string{"ThrottleTimeMs", "ErrorCode"}). AssertTopics([]string{"Topic"}, []string{"ErrorCode", "PartitionIndex"}, nil, nil). Run() diff --git a/internal/stagef4.go b/internal/stagef4.go index cbbf9af..f9f70c3 100644 --- a/internal/stagef4.go +++ b/internal/stagef4.go @@ -12,23 +12,24 @@ import ( func testFetchNoMessages(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { - return err - } - logger := stageHarness.Logger err := serializer.GenerateLogDirs(logger, false) if err != nil { return err } - correlationId := getRandomCorrelationId() + if err := b.Run(); err != nil { + return err + } + correlationId := getRandomCorrelationId() broker := protocol.NewBroker("localhost:9092") if err := broker.ConnectWithRetries(b, logger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.FetchRequest{ Header: kafkaapi.RequestHeader{ diff --git a/internal/stagef5.go b/internal/stagef5.go index 4c43cc3..2ca114f 100644 --- a/internal/stagef5.go +++ b/internal/stagef5.go @@ -12,23 +12,24 @@ import ( func testFetchWithSingleMessage(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { - return err - } - logger := stageHarness.Logger err := serializer.GenerateLogDirs(logger, false) if err != nil { return err } - correlationId := getRandomCorrelationId() + if err := b.Run(); err != nil { + return err + } + correlationId := getRandomCorrelationId() broker := protocol.NewBroker("localhost:9092") if err := broker.ConnectWithRetries(b, logger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.FetchRequest{ Header: kafkaapi.RequestHeader{ diff --git a/internal/stagef6.go b/internal/stagef6.go index 0fe74b2..407c46f 100644 --- a/internal/stagef6.go +++ b/internal/stagef6.go @@ -12,23 +12,24 @@ import ( func testFetchMultipleMessages(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { - return err - } - logger := stageHarness.Logger err := serializer.GenerateLogDirs(logger, false) if err != nil { return err } - correlationId := getRandomCorrelationId() + if err := b.Run(); err != nil { + return err + } + correlationId := getRandomCorrelationId() broker := protocol.NewBroker("localhost:9092") if err := broker.ConnectWithRetries(b, logger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.FetchRequest{ Header: kafkaapi.RequestHeader{ diff --git a/internal/stagep1.go b/internal/stagep1.go index 0c6420e..db1373a 100644 --- a/internal/stagep1.go +++ b/internal/stagep1.go @@ -10,26 +10,26 @@ import ( "github.com/codecrafters-io/tester-utils/test_case_harness" ) -func testAPIVersionwDescribeTopicPartitions(stageHarness *test_case_harness.TestCaseHarness) error { +func testAPIVersionWithDescribeTopicPartitions(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true) + if err != nil { return err } - quietLogger := logger.GetQuietLogger("") - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(quietLogger, true) - if err != nil { + stageLogger := stageHarness.Logger + if err := b.Run(); err != nil { return err } correlationId := getRandomCorrelationId() - broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.ApiVersionsRequest{ Header: kafkaapi.RequestHeader{ @@ -46,16 +46,16 @@ func testAPIVersionwDescribeTopicPartitions(stageHarness *test_case_harness.Test } message := kafkaapi.EncodeApiVersionsRequest(&request) - logger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) response, err := broker.SendAndReceive(message) if err != nil { return err } - logger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) + stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) - responseHeader, responseBody, err := kafkaapi.DecodeApiVersionsHeaderAndResponse(response.Payload, 3, logger) + responseHeader, responseBody, err := kafkaapi.DecodeApiVersionsHeaderAndResponse(response.Payload, 3, stageLogger) if err != nil { return err } @@ -63,7 +63,7 @@ func testAPIVersionwDescribeTopicPartitions(stageHarness *test_case_harness.Test expectedResponseHeader := kafkaapi.ResponseHeader{ CorrelationId: correlationId, } - if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, logger); err != nil { + if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, stageLogger); err != nil { return err } @@ -84,7 +84,7 @@ func testAPIVersionwDescribeTopicPartitions(stageHarness *test_case_harness.Test }, } - if err = assertions.NewApiVersionsResponseAssertion(*responseBody, expectedApiVersionResponse).Evaluate([]string{"ErrorCode"}, true, logger); err != nil { + if err = assertions.NewApiVersionsResponseAssertion(*responseBody, expectedApiVersionResponse).Evaluate([]string{"ErrorCode"}, true, stageLogger); err != nil { return err } diff --git a/internal/stagep2.go b/internal/stagep2.go index 0840c5f..002e599 100644 --- a/internal/stagep2.go +++ b/internal/stagep2.go @@ -13,24 +13,24 @@ import ( func testDTPartitionWithUnknownTopic(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true) + if err != nil { return err } - quietLogger := logger.GetQuietLogger("") - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(quietLogger, true) - if err != nil { + stageLogger := stageHarness.Logger + if err := b.Run(); err != nil { return err } correlationId := getRandomCorrelationId() - broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.DescribeTopicPartitionsRequest{ Header: kafkaapi.RequestHeader{ @@ -50,16 +50,16 @@ func testDTPartitionWithUnknownTopic(stageHarness *test_case_harness.TestCaseHar } message := kafkaapi.EncodeDescribeTopicPartitionsRequest(&request) - logger.Infof("Sending \"DescribeTopicPartitions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"DescribeTopicPartitions\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending \"DescribeTopicPartitions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"DescribeTopicPartitions\" request: \n%v\n", GetFormattedHexdump(message)) response, err := broker.SendAndReceive(message) if err != nil { return err } - logger.Debugf("Hexdump of received \"DescribeTopicPartitions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) + stageLogger.Debugf("Hexdump of received \"DescribeTopicPartitions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) - responseHeader, responseBody, err := kafkaapi.DecodeDescribeTopicPartitionsHeaderAndResponse(response.Payload, logger) + responseHeader, responseBody, err := kafkaapi.DecodeDescribeTopicPartitionsHeaderAndResponse(response.Payload, stageLogger) if err != nil { return err } @@ -67,7 +67,7 @@ func testDTPartitionWithUnknownTopic(stageHarness *test_case_harness.TestCaseHar expectedResponseHeader := kafkaapi.ResponseHeader{ CorrelationId: correlationId, } - if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, logger); err != nil { + if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, stageLogger); err != nil { return err } @@ -83,7 +83,7 @@ func testDTPartitionWithUnknownTopic(stageHarness *test_case_harness.TestCaseHar }, } - return assertions.NewDescribeTopicPartitionsResponseAssertion(*responseBody, expectedDescribeTopicPartitionsResponse, logger). + return assertions.NewDescribeTopicPartitionsResponseAssertion(*responseBody, expectedDescribeTopicPartitionsResponse, stageLogger). AssertBody([]string{"ThrottleTimeMs"}). AssertTopics([]string{"ErrorCode", "Name", "TopicID"}, []string{}). Run() diff --git a/internal/stagep3.go b/internal/stagep3.go index 57ad6ce..911332d 100644 --- a/internal/stagep3.go +++ b/internal/stagep3.go @@ -12,23 +12,24 @@ import ( func testDTPartitionWithTopicAndSinglePartition(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + stageLogger := stageHarness.Logger + err := serializer.GenerateLogDirs(stageLogger, true) + if err != nil { return err } - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(logger, true) - if err != nil { + if err := b.Run(); err != nil { return err } correlationId := getRandomCorrelationId() - broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.DescribeTopicPartitionsRequest{ Header: kafkaapi.RequestHeader{ @@ -48,16 +49,16 @@ func testDTPartitionWithTopicAndSinglePartition(stageHarness *test_case_harness. } message := kafkaapi.EncodeDescribeTopicPartitionsRequest(&request) - logger.Infof("Sending \"DescribeTopicPartitions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"DescribeTopicPartitions\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending \"DescribeTopicPartitions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"DescribeTopicPartitions\" request: \n%v\n", GetFormattedHexdump(message)) response, err := broker.SendAndReceive(message) if err != nil { return err } - logger.Debugf("Hexdump of received \"DescribeTopicPartitions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) + stageLogger.Debugf("Hexdump of received \"DescribeTopicPartitions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) - responseHeader, responseBody, err := kafkaapi.DecodeDescribeTopicPartitionsHeaderAndResponse(response.Payload, logger) + responseHeader, responseBody, err := kafkaapi.DecodeDescribeTopicPartitionsHeaderAndResponse(response.Payload, stageLogger) if err != nil { return err } @@ -65,7 +66,7 @@ func testDTPartitionWithTopicAndSinglePartition(stageHarness *test_case_harness. expectedResponseHeader := kafkaapi.ResponseHeader{ CorrelationId: correlationId, } - if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, logger); err != nil { + if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, stageLogger); err != nil { return err } @@ -92,7 +93,7 @@ func testDTPartitionWithTopicAndSinglePartition(stageHarness *test_case_harness. }, } - return assertions.NewDescribeTopicPartitionsResponseAssertion(*responseBody, expectedDescribeTopicPartitionsResponse, logger). + return assertions.NewDescribeTopicPartitionsResponseAssertion(*responseBody, expectedDescribeTopicPartitionsResponse, stageLogger). AssertBody([]string{"ThrottleTimeMs"}). AssertTopics([]string{"ErrorCode", "Name", "TopicID"}, []string{}). Run() diff --git a/internal/stagep4.go b/internal/stagep4.go index bd98d92..c2f1d42 100644 --- a/internal/stagep4.go +++ b/internal/stagep4.go @@ -12,23 +12,24 @@ import ( func testDTPartitionWithTopicAndMultiplePartitions2(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + stageLogger := stageHarness.Logger + err := serializer.GenerateLogDirs(stageLogger, true) + if err != nil { return err } - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(logger, true) - if err != nil { + if err := b.Run(); err != nil { return err } correlationId := getRandomCorrelationId() - broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.DescribeTopicPartitionsRequest{ Header: kafkaapi.RequestHeader{ @@ -48,16 +49,16 @@ func testDTPartitionWithTopicAndMultiplePartitions2(stageHarness *test_case_harn } message := kafkaapi.EncodeDescribeTopicPartitionsRequest(&request) - logger.Infof("Sending \"DescribeTopicPartitions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"DescribeTopicPartitions\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending \"DescribeTopicPartitions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"DescribeTopicPartitions\" request: \n%v\n", GetFormattedHexdump(message)) response, err := broker.SendAndReceive(message) if err != nil { return err } - logger.Debugf("Hexdump of received \"DescribeTopicPartitions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) + stageLogger.Debugf("Hexdump of received \"DescribeTopicPartitions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) - responseHeader, responseBody, err := kafkaapi.DecodeDescribeTopicPartitionsHeaderAndResponse(response.Payload, logger) + responseHeader, responseBody, err := kafkaapi.DecodeDescribeTopicPartitionsHeaderAndResponse(response.Payload, stageLogger) if err != nil { return err } @@ -65,7 +66,7 @@ func testDTPartitionWithTopicAndMultiplePartitions2(stageHarness *test_case_harn expectedResponseHeader := kafkaapi.ResponseHeader{ CorrelationId: correlationId, } - if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, logger); err != nil { + if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, stageLogger); err != nil { return err } @@ -104,7 +105,7 @@ func testDTPartitionWithTopicAndMultiplePartitions2(stageHarness *test_case_harn }, } - return assertions.NewDescribeTopicPartitionsResponseAssertion(*responseBody, expectedDescribeTopicPartitionsResponse, logger). + return assertions.NewDescribeTopicPartitionsResponseAssertion(*responseBody, expectedDescribeTopicPartitionsResponse, stageLogger). AssertBody([]string{"ThrottleTimeMs"}). AssertTopics([]string{"ErrorCode", "Name", "TopicID"}, []string{"ErrorCode", "PartitionIndex"}). Run() diff --git a/internal/stagep5.go b/internal/stagep5.go index c123764..95f9d1b 100644 --- a/internal/stagep5.go +++ b/internal/stagep5.go @@ -12,20 +12,24 @@ import ( func testDTPartitionWithTopics(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + stageLogger := stageHarness.Logger + err := serializer.GenerateLogDirs(stageLogger, true) + if err != nil { return err } - logger := stageHarness.Logger - serializer.GenerateLogDirs(logger, true) + if err := b.Run(); err != nil { + return err + } correlationId := getRandomCorrelationId() - broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.DescribeTopicPartitionsRequest{ Header: kafkaapi.RequestHeader{ @@ -53,16 +57,16 @@ func testDTPartitionWithTopics(stageHarness *test_case_harness.TestCaseHarness) // bar -> baz -> foo message := kafkaapi.EncodeDescribeTopicPartitionsRequest(&request) - logger.Infof("Sending \"DescribeTopicPartitions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"DescribeTopicPartitions\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending \"DescribeTopicPartitions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"DescribeTopicPartitions\" request: \n%v\n", GetFormattedHexdump(message)) response, err := broker.SendAndReceive(message) if err != nil { return err } - logger.Debugf("Hexdump of received \"DescribeTopicPartitions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) + stageLogger.Debugf("Hexdump of received \"DescribeTopicPartitions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) - responseHeader, responseBody, err := kafkaapi.DecodeDescribeTopicPartitionsHeaderAndResponse(response.Payload, logger) + responseHeader, responseBody, err := kafkaapi.DecodeDescribeTopicPartitionsHeaderAndResponse(response.Payload, stageLogger) if err != nil { return err } @@ -70,7 +74,7 @@ func testDTPartitionWithTopics(stageHarness *test_case_harness.TestCaseHarness) expectedResponseHeader := kafkaapi.ResponseHeader{ CorrelationId: correlationId, } - if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, logger); err != nil { + if err = assertions.NewResponseHeaderAssertion(*responseHeader, expectedResponseHeader).Evaluate([]string{"CorrelationId"}, stageLogger); err != nil { return err } @@ -145,7 +149,7 @@ func testDTPartitionWithTopics(stageHarness *test_case_harness.TestCaseHarness) }, } - return assertions.NewDescribeTopicPartitionsResponseAssertion(*responseBody, expectedDescribeTopicPartitionsResponse, logger). + return assertions.NewDescribeTopicPartitionsResponseAssertion(*responseBody, expectedDescribeTopicPartitionsResponse, stageLogger). AssertBody([]string{"ThrottleTimeMs"}). AssertTopics([]string{"ErrorCode", "Name", "TopicID"}, []string{"ErrorCode", "PartitionIndex"}). Run() diff --git a/internal/test_helpers/fixtures/describe_topic_partitions/pass b/internal/test_helpers/fixtures/describe_topic_partitions/pass index c07ebdd..7429e19 100644 --- a/internal/test_helpers/fixtures/describe_topic_partitions/pass +++ b/internal/test_helpers/fixtures/describe_topic_partitions/pass @@ -383,15 +383,15 @@ Debug = true [stage-4] -----+-------------------------------------------------+----------------- [stage-4] 0000 | 00 00 00 31 00 4b 00 00 60 5a 05 cd 00 0c 6b 61 | ...1.K..`Z....ka [stage-4] 0010 | 66 6b 61 2d 74 65 73 74 65 72 00 02 12 75 6e 6b | fka-tester...unk -[stage-4] 0020 | 6e 6f 77 6e 2d 74 6f 70 69 63 2d 70 61 7a 00 00 | nown-topic-paz.. +[stage-4] 0020 | 6e 6f 77 6e 2d 74 6f 70 69 63 2d 71 75 7a 00 00 | nown-topic-quz.. [stage-4] 0030 | 00 00 01 ff 00 | ..... [stage-4]  [stage-4] Hexdump of received "DescribeTopicPartitions" response:  [stage-4] Idx | Hex | ASCII [stage-4] -----+-------------------------------------------------+----------------- [stage-4] 0000 | 00 00 00 37 60 5a 05 cd 00 00 00 00 00 02 00 03 | ...7`Z.......... -[stage-4] 0010 | 12 75 6e 6b 6e 6f 77 6e 2d 74 6f 70 69 63 2d 70 | .unknown-topic-p -[stage-4] 0020 | 61 7a 00 00 00 00 00 00 00 00 00 00 00 00 00 00 | az.............. +[stage-4] 0010 | 12 75 6e 6b 6e 6f 77 6e 2d 74 6f 70 69 63 2d 71 | .unknown-topic-q +[stage-4] 0020 | 75 7a 00 00 00 00 00 00 00 00 00 00 00 00 00 00 | uz.............. [stage-4] 0030 | 00 00 00 01 00 00 0d f8 00 ff 00 | ........... [stage-4]  [stage-4] [Decoder] - .ResponseHeader @@ -402,7 +402,7 @@ Debug = true [stage-4] [Decoder]  - .topic.length (1) [stage-4] [Decoder]  - .Topics[0] [stage-4] [Decoder]  - .error_code (3) -[stage-4] [Decoder]  - .name (unknown-topic-paz) +[stage-4] [Decoder]  - .name (unknown-topic-quz) [stage-4] [Decoder]  - .topic_id (00000000-0000-0000-0000-000000000000) [stage-4] [Decoder]  - .is_internal (false) [stage-4] [Decoder]  - .num_partitions (0) @@ -413,14 +413,13 @@ Debug = true [stage-4] ✓ Correlation ID: 1616512461 [stage-4] ✓ Throttle Time: 0 [stage-4]  ✓ TopicResponse[0] Error code: 3 -[stage-4]  ✓ TopicResponse[0] Topic Name: unknown-topic-paz +[stage-4]  ✓ TopicResponse[0] Topic Name: unknown-topic-quz [stage-4]  ✓ TopicResponse[0] Topic UUID: 00000000-0000-0000-0000-000000000000 [stage-4] Test passed. [stage-4] Terminating program [stage-4] Program terminated successfully [stage-3] Running tests for Stage #3: ea7 -[stage-3] $ ./your_program.sh /tmp/server.properties [stage-3] [Serializer] Writing log files to: /tmp/kraft-combined-logs [stage-3] [Serializer]  - Wrote file to: /tmp/server.properties [stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties @@ -428,6 +427,7 @@ Debug = true [stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata [stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-3] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs +[stage-3] $ ./your_program.sh /tmp/server.properties [stage-3] Connecting to broker at: localhost:9092 [stage-3] Connection to broker at localhost:9092 successful [stage-3] Sending "DescribeTopicPartitions" (version: 0) request (Correlation id: 96710698) @@ -443,7 +443,7 @@ Debug = true [stage-3] -----+-------------------------------------------------+----------------- [stage-3] 0000 | 00 00 00 45 05 c3 b0 2a 00 00 00 00 00 02 00 00 | ...E...*........ [stage-3] 0010 | 04 62 61 72 00 00 00 00 00 00 40 00 80 00 00 00 | .bar......@..... -[stage-3] 0020 | 00 00 00 42 00 02 00 00 00 00 00 00 00 00 00 01 | ...B............ +[stage-3] 0020 | 00 00 00 20 00 02 00 00 00 00 00 00 00 00 00 01 | ... ............ [stage-3] 0030 | 00 00 00 00 02 00 00 00 01 02 00 00 00 01 01 01 | ................ [stage-3] 0040 | 01 00 00 00 0d f8 00 ff 00 | ......... [stage-3]  @@ -456,7 +456,7 @@ Debug = true [stage-3] [Decoder]  - .Topics[0] [stage-3] [Decoder]  - .error_code (0) [stage-3] [Decoder]  - .name (bar) -[stage-3] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000042) +[stage-3] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000020) [stage-3] [Decoder]  - .is_internal (false) [stage-3] [Decoder]  - .num_partitions (1) [stage-3] [Decoder]  - .Partitions[0] @@ -478,13 +478,12 @@ Debug = true [stage-3] ✓ Throttle Time: 0 [stage-3]  ✓ TopicResponse[0] Error code: 0 [stage-3]  ✓ TopicResponse[0] Topic Name: bar -[stage-3]  ✓ TopicResponse[0] Topic UUID: 00000000-0000-4000-8000-000000000042 +[stage-3]  ✓ TopicResponse[0] Topic UUID: 00000000-0000-4000-8000-000000000020 [stage-3] Test passed. [stage-3] Terminating program [stage-3] Program terminated successfully [stage-2] Running tests for Stage #2: ku4 -[stage-2] $ ./your_program.sh /tmp/server.properties [stage-2] [Serializer] Writing log files to: /tmp/kraft-combined-logs [stage-2] [Serializer]  - Wrote file to: /tmp/server.properties [stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties @@ -492,6 +491,7 @@ Debug = true [stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata [stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-2] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs +[stage-2] $ ./your_program.sh /tmp/server.properties [stage-2] Connecting to broker at: localhost:9092 [stage-2] Connection to broker at localhost:9092 successful [stage-2] Sending "DescribeTopicPartitions" (version: 0) request (Correlation id: 254678266) @@ -499,15 +499,15 @@ Debug = true [stage-2] Idx | Hex | ASCII [stage-2] -----+-------------------------------------------------+----------------- [stage-2] 0000 | 00 00 00 23 00 4b 00 00 0f 2e 14 fa 00 0c 6b 61 | ...#.K........ka -[stage-2] 0010 | 66 6b 61 2d 74 65 73 74 65 72 00 02 04 66 6f 6f | fka-tester...foo +[stage-2] 0010 | 66 6b 61 2d 74 65 73 74 65 72 00 02 04 70 61 78 | fka-tester...pax [stage-2] 0020 | 00 00 00 00 02 ff 00 | ....... [stage-2]  [stage-2] Hexdump of received "DescribeTopicPartitions" response:  [stage-2] Idx | Hex | ASCII [stage-2] -----+-------------------------------------------------+----------------- [stage-2] 0000 | 00 00 00 61 0f 2e 14 fa 00 00 00 00 00 02 00 00 | ...a............ -[stage-2] 0010 | 04 66 6f 6f 00 00 00 00 00 00 40 00 80 00 00 00 | .foo......@..... -[stage-2] 0020 | 00 00 00 32 00 03 00 00 00 00 00 00 00 00 00 01 | ...2............ +[stage-2] 0010 | 04 70 61 78 00 00 00 00 00 00 40 00 80 00 00 00 | .pax......@..... +[stage-2] 0020 | 00 00 00 15 00 03 00 00 00 00 00 00 00 00 00 01 | ................ [stage-2] 0030 | 00 00 00 00 02 00 00 00 01 02 00 00 00 01 01 01 | ................ [stage-2] 0040 | 01 00 00 00 00 00 00 01 00 00 00 01 00 00 00 00 | ................ [stage-2] 0050 | 02 00 00 00 01 02 00 00 00 01 01 01 01 00 00 00 | ................ @@ -521,8 +521,8 @@ Debug = true [stage-2] [Decoder]  - .topic.length (1) [stage-2] [Decoder]  - .Topics[0] [stage-2] [Decoder]  - .error_code (0) -[stage-2] [Decoder]  - .name (foo) -[stage-2] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000032) +[stage-2] [Decoder]  - .name (pax) +[stage-2] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000015) [stage-2] [Decoder]  - .is_internal (false) [stage-2] [Decoder]  - .num_partitions (2) [stage-2] [Decoder]  - .Partitions[0] @@ -554,8 +554,8 @@ Debug = true [stage-2] ✓ Correlation ID: 254678266 [stage-2] ✓ Throttle Time: 0 [stage-2]  ✓ TopicResponse[0] Error code: 0 -[stage-2]  ✓ TopicResponse[0] Topic Name: foo -[stage-2]  ✓ TopicResponse[0] Topic UUID: 00000000-0000-4000-8000-000000000032 +[stage-2]  ✓ TopicResponse[0] Topic Name: pax +[stage-2]  ✓ TopicResponse[0] Topic UUID: 00000000-0000-4000-8000-000000000015 [stage-2]  ✓ PartitionResponse[0] Error code: 0 [stage-2]  ✓ PartitionResponse[0] Partition Index: 0 [stage-2]  ✓ PartitionResponse[1] Error code: 0 @@ -565,7 +565,6 @@ Debug = true [stage-2] Program terminated successfully [stage-1] Running tests for Stage #1: wq2 -[stage-1] $ ./your_program.sh /tmp/server.properties [stage-1] [Serializer] Writing log files to: /tmp/kraft-combined-logs [stage-1] [Serializer]  - Wrote file to: /tmp/server.properties [stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties @@ -573,6 +572,7 @@ Debug = true [stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata [stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-1] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs +[stage-1] $ ./your_program.sh /tmp/server.properties [stage-1] Connecting to broker at: localhost:9092 [stage-1] Connection to broker at localhost:9092 successful [stage-1] Sending "DescribeTopicPartitions" (version: 0) request (Correlation id: 1893237013) @@ -581,7 +581,7 @@ Debug = true [stage-1] -----+-------------------------------------------------+----------------- [stage-1] 0000 | 00 00 00 2d 00 4b 00 00 70 d8 81 15 00 0c 6b 61 | ...-.K..p.....ka [stage-1] 0010 | 66 6b 61 2d 74 65 73 74 65 72 00 04 04 62 61 72 | fka-tester...bar -[stage-1] 0020 | 00 04 62 61 7a 00 04 66 6f 6f 00 00 00 00 04 ff | ..baz..foo...... +[stage-1] 0020 | 00 04 66 6f 6f 00 04 70 61 78 00 00 00 00 04 ff | ..foo..pax...... [stage-1] 0030 | 00 | . [stage-1]  [stage-1] Hexdump of received "DescribeTopicPartitions" response:  @@ -589,14 +589,14 @@ Debug = true [stage-1] -----+-------------------------------------------------+----------------- [stage-1] 0000 | 00 00 00 d3 70 d8 81 15 00 00 00 00 00 04 00 00 | ....p........... [stage-1] 0010 | 04 62 61 72 00 00 00 00 00 00 40 00 80 00 00 00 | .bar......@..... -[stage-1] 0020 | 00 00 00 42 00 02 00 00 00 00 00 00 00 00 00 01 | ...B............ +[stage-1] 0020 | 00 00 00 20 00 02 00 00 00 00 00 00 00 00 00 01 | ... ............ [stage-1] 0030 | 00 00 00 00 02 00 00 00 01 02 00 00 00 01 01 01 | ................ -[stage-1] 0040 | 01 00 00 00 0d f8 00 00 00 04 62 61 7a 00 00 00 | ..........baz... -[stage-1] 0050 | 00 00 00 40 00 80 00 00 00 00 00 00 76 00 02 00 | ...@........v... +[stage-1] 0040 | 01 00 00 00 0d f8 00 00 00 04 66 6f 6f 00 00 00 | ..........foo... +[stage-1] 0050 | 00 00 00 40 00 80 00 00 00 00 00 00 96 00 02 00 | ...@............ [stage-1] 0060 | 00 00 00 00 00 00 00 00 01 00 00 00 00 02 00 00 | ................ [stage-1] 0070 | 00 01 02 00 00 00 01 01 01 01 00 00 00 0d f8 00 | ................ -[stage-1] 0080 | 00 00 04 66 6f 6f 00 00 00 00 00 00 40 00 80 00 | ...foo......@... -[stage-1] 0090 | 00 00 00 00 00 32 00 03 00 00 00 00 00 00 00 00 | .....2.......... +[stage-1] 0080 | 00 00 04 70 61 78 00 00 00 00 00 00 40 00 80 00 | ...pax......@... +[stage-1] 0090 | 00 00 00 00 00 15 00 03 00 00 00 00 00 00 00 00 | ................ [stage-1] 00a0 | 00 01 00 00 00 00 02 00 00 00 01 02 00 00 00 01 | ................ [stage-1] 00b0 | 01 01 01 00 00 00 00 00 00 01 00 00 00 01 00 00 | ................ [stage-1] 00c0 | 00 00 02 00 00 00 01 02 00 00 00 01 01 01 01 00 | ................ @@ -611,7 +611,7 @@ Debug = true [stage-1] [Decoder]  - .Topics[0] [stage-1] [Decoder]  - .error_code (0) [stage-1] [Decoder]  - .name (bar) -[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000042) +[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000020) [stage-1] [Decoder]  - .is_internal (false) [stage-1] [Decoder]  - .num_partitions (1) [stage-1] [Decoder]  - .Partitions[0] @@ -629,8 +629,8 @@ Debug = true [stage-1] [Decoder]  - .TAG_BUFFER [stage-1] [Decoder]  - .Topics[1] [stage-1] [Decoder]  - .error_code (0) -[stage-1] [Decoder]  - .name (baz) -[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000076) +[stage-1] [Decoder]  - .name (foo) +[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000096) [stage-1] [Decoder]  - .is_internal (false) [stage-1] [Decoder]  - .num_partitions (1) [stage-1] [Decoder]  - .Partitions[0] @@ -648,8 +648,8 @@ Debug = true [stage-1] [Decoder]  - .TAG_BUFFER [stage-1] [Decoder]  - .Topics[2] [stage-1] [Decoder]  - .error_code (0) -[stage-1] [Decoder]  - .name (foo) -[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000032) +[stage-1] [Decoder]  - .name (pax) +[stage-1] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000015) [stage-1] [Decoder]  - .is_internal (false) [stage-1] [Decoder]  - .num_partitions (2) [stage-1] [Decoder]  - .Partitions[0] @@ -682,17 +682,17 @@ Debug = true [stage-1] ✓ Throttle Time: 0 [stage-1]  ✓ TopicResponse[0] Error code: 0 [stage-1]  ✓ TopicResponse[0] Topic Name: bar -[stage-1]  ✓ TopicResponse[0] Topic UUID: 00000000-0000-4000-8000-000000000042 +[stage-1]  ✓ TopicResponse[0] Topic UUID: 00000000-0000-4000-8000-000000000020 [stage-1]  ✓ PartitionResponse[0] Error code: 0 [stage-1]  ✓ PartitionResponse[0] Partition Index: 0 [stage-1]  ✓ TopicResponse[1] Error code: 0 -[stage-1]  ✓ TopicResponse[1] Topic Name: baz -[stage-1]  ✓ TopicResponse[1] Topic UUID: 00000000-0000-4000-8000-000000000076 +[stage-1]  ✓ TopicResponse[1] Topic Name: foo +[stage-1]  ✓ TopicResponse[1] Topic UUID: 00000000-0000-4000-8000-000000000096 [stage-1]  ✓ PartitionResponse[0] Error code: 0 [stage-1]  ✓ PartitionResponse[0] Partition Index: 0 [stage-1]  ✓ TopicResponse[2] Error code: 0 -[stage-1]  ✓ TopicResponse[2] Topic Name: foo -[stage-1]  ✓ TopicResponse[2] Topic UUID: 00000000-0000-4000-8000-000000000032 +[stage-1]  ✓ TopicResponse[2] Topic Name: pax +[stage-1]  ✓ TopicResponse[2] Topic UUID: 00000000-0000-4000-8000-000000000015 [stage-1]  ✓ PartitionResponse[0] Error code: 0 [stage-1]  ✓ PartitionResponse[0] Partition Index: 0 [stage-1]  ✓ PartitionResponse[1] Error code: 0 diff --git a/internal/test_helpers/fixtures/fetch/pass b/internal/test_helpers/fixtures/fetch/pass index fc8ac00..cfc0653 100644 --- a/internal/test_helpers/fixtures/fetch/pass +++ b/internal/test_helpers/fixtures/fetch/pass @@ -374,22 +374,22 @@ Debug = true [stage-6] Program terminated successfully [stage-5] Running tests for Stage #5: dh6 -[stage-5] $ ./your_program.sh /tmp/server.properties [stage-5] [Serializer] Writing log files to: /tmp/kraft-combined-logs [stage-5] [Serializer]  - Wrote file to: /tmp/server.properties [stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties [stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown [stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/partition.metadata -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata [stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-1/partition.metadata +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/partition.metadata +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-1/partition.metadata [stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata [stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/00000000000000000000.log -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log [stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log -[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-1/00000000000000000000.log +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/00000000000000000000.log +[stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-1/00000000000000000000.log [stage-5] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-5] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs +[stage-5] $ ./your_program.sh /tmp/server.properties [stage-5] Connecting to broker at: localhost:9092 [stage-5] Connection to broker at localhost:9092 successful [stage-5] Sending "Fetch" (version: 16) request (Correlation id: 1616512461) @@ -404,8 +404,8 @@ Debug = true [stage-5] Hexdump of received "Fetch" response:  [stage-5] Idx | Hex | ASCII [stage-5] -----+-------------------------------------------------+----------------- -[stage-5] 0000 | 00 00 00 11 60 5a 05 cd 00 00 00 00 00 00 00 04 | ....`Z.......... -[stage-5] 0010 | 0f a9 38 01 00 | ..8.. +[stage-5] 0000 | 00 00 00 11 60 5a 05 cd 00 00 00 00 00 00 00 03 | ....`Z.......... +[stage-5] 0010 | 30 d0 a4 01 00 | 0.... [stage-5]  [stage-5] [Decoder] - .ResponseHeader [stage-5] [Decoder]  - .correlation_id (1616512461) @@ -413,7 +413,7 @@ Debug = true [stage-5] [Decoder] - .ResponseBody [stage-5] [Decoder]  - .throttle_time_ms (0) [stage-5] [Decoder]  - .error_code (0) -[stage-5] [Decoder]  - .session_id (68135224) +[stage-5] [Decoder]  - .session_id (53530788) [stage-5] [Decoder]  - .num_responses (0) [stage-5] [Decoder]  - .TAG_BUFFER [stage-5] ✓ Correlation ID: 1616512461 @@ -425,22 +425,22 @@ Debug = true [stage-5] Program terminated successfully [stage-4] Running tests for Stage #4: hn6 -[stage-4] $ ./your_program.sh /tmp/server.properties [stage-4] [Serializer] Writing log files to: /tmp/kraft-combined-logs [stage-4] [Serializer]  - Wrote file to: /tmp/server.properties [stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties [stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown [stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/partition.metadata -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata [stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-1/partition.metadata +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/partition.metadata +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-1/partition.metadata [stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata [stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/00000000000000000000.log -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log [stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log -[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-1/00000000000000000000.log +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/00000000000000000000.log +[stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-1/00000000000000000000.log [stage-4] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-4] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs +[stage-4] $ ./your_program.sh /tmp/server.properties [stage-4] Connecting to broker at: localhost:9092 [stage-4] Connection to broker at localhost:9092 successful [stage-4] Sending "Fetch" (version: 16) request (Correlation id: 96710698) @@ -450,7 +450,7 @@ Debug = true [stage-4] 0000 | 00 00 00 60 00 01 00 10 05 c3 b0 2a 00 09 6b 61 | ...`.......*..ka [stage-4] 0010 | 66 6b 61 2d 63 6c 69 00 00 00 01 f4 00 00 00 01 | fka-cli......... [stage-4] 0020 | 03 20 00 00 00 00 00 00 00 00 00 00 00 02 00 00 | . .............. -[stage-4] 0030 | 00 00 00 00 00 00 00 00 00 00 00 00 44 86 02 00 | ............D... +[stage-4] 0030 | 00 00 00 00 00 00 00 00 00 00 00 00 43 83 02 00 | ............C... [stage-4] 0040 | 00 00 00 ff ff ff ff 00 00 00 00 00 00 00 00 ff | ................ [stage-4] 0050 | ff ff ff ff ff ff ff ff ff ff ff 00 10 00 00 00 | ................ [stage-4] 0060 | 00 01 01 00 | .... @@ -458,9 +458,9 @@ Debug = true [stage-4] Hexdump of received "Fetch" response:  [stage-4] Idx | Hex | ASCII [stage-4] -----+-------------------------------------------------+----------------- -[stage-4] 0000 | 00 00 00 48 05 c3 b0 2a 00 00 00 00 00 00 00 01 | ...H...*........ -[stage-4] 0010 | 9e d3 25 02 00 00 00 00 00 00 00 00 00 00 00 00 | ..%............. -[stage-4] 0020 | 00 00 44 86 02 00 00 00 00 00 64 ff ff ff ff ff | ..D.......d..... +[stage-4] 0000 | 00 00 00 48 05 c3 b0 2a 00 00 00 00 00 00 00 03 | ...H...*........ +[stage-4] 0010 | 7e 75 e1 02 00 00 00 00 00 00 00 00 00 00 00 00 | ~u.............. +[stage-4] 0020 | 00 00 43 83 02 00 00 00 00 00 64 ff ff ff ff ff | ..C.......d..... [stage-4] 0030 | ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff ff | ................ [stage-4] 0040 | ff ff ff 01 ff ff ff ff 01 00 00 00 | ............ [stage-4]  @@ -470,10 +470,10 @@ Debug = true [stage-4] [Decoder] - .ResponseBody [stage-4] [Decoder]  - .throttle_time_ms (0) [stage-4] [Decoder]  - .error_code (0) -[stage-4] [Decoder]  - .session_id (27185957) +[stage-4] [Decoder]  - .session_id (58619361) [stage-4] [Decoder]  - .num_responses (1) [stage-4] [Decoder]  - .TopicResponse[0] -[stage-4] [Decoder]  - .topic_id (00000000-0000-0000-0000-000000004486) +[stage-4] [Decoder]  - .topic_id (00000000-0000-0000-0000-000000004383) [stage-4] [Decoder]  - .num_partitions (1) [stage-4] [Decoder]  - .PartitionResponse[0] [stage-4] [Decoder]  - .partition_index (0) @@ -490,7 +490,7 @@ Debug = true [stage-4] ✓ Correlation ID: 96710698 [stage-4] ✓ Throttle Time: 0 [stage-4] ✓ Error Code: 0 (NO_ERROR) -[stage-4]  ✓ TopicResponse[0] Topic UUID: 00000000-0000-0000-0000-000000004486 +[stage-4]  ✓ TopicResponse[0] Topic UUID: 00000000-0000-0000-0000-000000004383 [stage-4]  ✓ PartitionResponse[0] Error code: 100 (UNKNOWN_TOPIC_ID) [stage-4]  ✓ PartitionResponse[0] Partition Index: 0 [stage-4]  ✓ RecordBatches: [] @@ -499,22 +499,22 @@ Debug = true [stage-4] Program terminated successfully [stage-3] Running tests for Stage #3: cm4 -[stage-3] $ ./your_program.sh /tmp/server.properties [stage-3] [Serializer] Writing log files to: /tmp/kraft-combined-logs [stage-3] [Serializer]  - Wrote file to: /tmp/server.properties [stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties [stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown [stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata [stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-1/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/partition.metadata +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-1/partition.metadata [stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata [stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log [stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log -[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-1/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/00000000000000000000.log +[stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-1/00000000000000000000.log [stage-3] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-3] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs +[stage-3] $ ./your_program.sh /tmp/server.properties [stage-3] Connecting to broker at: localhost:9092 [stage-3] Connection to broker at localhost:9092 successful [stage-3] Sending "Fetch" (version: 16) request (Correlation id: 254678266) @@ -524,7 +524,7 @@ Debug = true [stage-3] 0000 | 00 00 00 60 00 01 00 10 0f 2e 14 fa 00 09 6b 61 | ...`..........ka [stage-3] 0010 | 66 6b 61 2d 63 6c 69 00 00 00 01 f4 00 00 00 01 | fka-cli......... [stage-3] 0020 | 03 20 00 00 00 00 00 00 00 00 00 00 00 02 00 00 | . .............. -[stage-3] 0030 | 00 00 00 00 40 00 80 00 00 00 00 00 00 14 02 00 | ....@........... +[stage-3] 0030 | 00 00 00 00 40 00 80 00 00 00 00 00 00 96 02 00 | ....@........... [stage-3] 0040 | 00 00 00 ff ff ff ff 00 00 00 00 00 00 00 00 ff | ................ [stage-3] 0050 | ff ff ff ff ff ff ff ff ff ff ff 00 10 00 00 00 | ................ [stage-3] 0060 | 00 01 01 00 | .... @@ -532,9 +532,9 @@ Debug = true [stage-3] Hexdump of received "Fetch" response:  [stage-3] Idx | Hex | ASCII [stage-3] -----+-------------------------------------------------+----------------- -[stage-3] 0000 | 00 00 00 48 0f 2e 14 fa 00 00 00 00 00 00 00 03 | ...H............ -[stage-3] 0010 | 9d 37 90 02 00 00 00 00 00 00 40 00 80 00 00 00 | .7........@..... -[stage-3] 0020 | 00 00 00 14 02 00 00 00 00 00 00 00 00 00 00 00 | ................ +[stage-3] 0000 | 00 00 00 48 0f 2e 14 fa 00 00 00 00 00 00 00 0f | ...H............ +[stage-3] 0010 | 33 98 ca 02 00 00 00 00 00 00 40 00 80 00 00 00 | 3.........@..... +[stage-3] 0020 | 00 00 00 96 02 00 00 00 00 00 00 00 00 00 00 00 | ................ [stage-3] 0030 | 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 | ................ [stage-3] 0040 | 00 00 00 00 ff ff ff ff 01 00 00 00 | ............ [stage-3]  @@ -544,10 +544,10 @@ Debug = true [stage-3] [Decoder] - .ResponseBody [stage-3] [Decoder]  - .throttle_time_ms (0) [stage-3] [Decoder]  - .error_code (0) -[stage-3] [Decoder]  - .session_id (60635024) +[stage-3] [Decoder]  - .session_id (255039690) [stage-3] [Decoder]  - .num_responses (1) [stage-3] [Decoder]  - .TopicResponse[0] -[stage-3] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000014) +[stage-3] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000096) [stage-3] [Decoder]  - .num_partitions (1) [stage-3] [Decoder]  - .PartitionResponse[0] [stage-3] [Decoder]  - .partition_index (0) @@ -564,7 +564,7 @@ Debug = true [stage-3] ✓ Correlation ID: 254678266 [stage-3] ✓ Throttle Time: 0 [stage-3] ✓ Error Code: 0 (NO_ERROR) -[stage-3]  ✓ TopicResponse[0] Topic UUID: 00000000-0000-4000-8000-000000000014 +[stage-3]  ✓ TopicResponse[0] Topic UUID: 00000000-0000-4000-8000-000000000096 [stage-3]  ✓ PartitionResponse[0] Error code: 0 (NO_ERROR) [stage-3]  ✓ PartitionResponse[0] Partition Index: 0 [stage-3]  ✓ RecordBatches: [] @@ -573,22 +573,22 @@ Debug = true [stage-3] Program terminated successfully [stage-2] Running tests for Stage #2: eg2 -[stage-2] $ ./your_program.sh /tmp/server.properties [stage-2] [Serializer] Writing log files to: /tmp/kraft-combined-logs [stage-2] [Serializer]  - Wrote file to: /tmp/server.properties [stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties [stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown [stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata [stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-1/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/partition.metadata +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-1/partition.metadata [stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata [stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log [stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log -[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-1/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/00000000000000000000.log +[stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-1/00000000000000000000.log [stage-2] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-2] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs +[stage-2] $ ./your_program.sh /tmp/server.properties [stage-2] Connecting to broker at: localhost:9092 [stage-2] Connection to broker at localhost:9092 successful [stage-2] Sending "Fetch" (version: 16) request (Correlation id: 1893237013) @@ -598,7 +598,7 @@ Debug = true [stage-2] 0000 | 00 00 00 60 00 01 00 10 70 d8 81 15 00 09 6b 61 | ...`....p.....ka [stage-2] 0010 | 66 6b 61 2d 63 6c 69 00 00 00 01 f4 00 00 00 01 | fka-cli......... [stage-2] 0020 | 03 20 00 00 00 00 00 00 00 00 00 00 00 02 00 00 | . .............. -[stage-2] 0030 | 00 00 00 00 40 00 80 00 00 00 00 00 00 26 02 00 | ....@........&.. +[stage-2] 0030 | 00 00 00 00 40 00 80 00 00 00 00 00 00 20 02 00 | ....@........ .. [stage-2] 0040 | 00 00 00 ff ff ff ff 00 00 00 00 00 00 00 00 ff | ................ [stage-2] 0050 | ff ff ff ff ff ff ff ff ff ff ff 00 10 00 00 00 | ................ [stage-2] 0060 | 00 01 01 00 | .... @@ -606,16 +606,17 @@ Debug = true [stage-2] Hexdump of received "Fetch" response:  [stage-2] Idx | Hex | ASCII [stage-2] -----+-------------------------------------------------+----------------- -[stage-2] 0000 | 00 00 00 98 70 d8 81 15 00 00 00 00 00 00 00 00 | ....p........... -[stage-2] 0010 | 92 cc 59 02 00 00 00 00 00 00 40 00 80 00 00 00 | ..Y.......@..... -[stage-2] 0020 | 00 00 00 26 02 00 00 00 00 00 00 00 00 00 00 00 | ...&............ +[stage-2] 0000 | 00 00 00 a6 70 d8 81 15 00 00 00 00 00 00 00 09 | ....p........... +[stage-2] 0010 | 93 62 0b 02 00 00 00 00 00 00 40 00 80 00 00 00 | .b........@..... +[stage-2] 0020 | 00 00 00 20 02 00 00 00 00 00 00 00 00 00 00 00 | ... ............ [stage-2] 0030 | 00 00 01 00 00 00 00 00 00 00 01 00 00 00 00 00 | ................ -[stage-2] 0040 | 00 00 00 00 ff ff ff ff 51 00 00 00 00 00 00 00 | ........Q....... -[stage-2] 0050 | 00 00 00 00 44 00 00 00 00 02 64 61 7c 4a 00 00 | ....D.....da|J.. +[stage-2] 0040 | 00 00 00 00 ff ff ff ff 5f 00 00 00 00 00 00 00 | ........_....... +[stage-2] 0050 | 00 00 00 00 52 00 00 00 00 02 8b aa 87 2a 00 00 | ....R........*.. [stage-2] 0060 | 00 00 00 00 00 00 01 91 e0 5b 6d 8b 00 00 01 91 | .........[m..... [stage-2] 0070 | e0 5b 6d 8b 00 00 00 00 00 00 00 00 00 00 00 00 | .[m............. -[stage-2] 0080 | 00 00 00 00 00 01 24 00 00 00 01 18 48 65 6c 6c | ......$.....Hell -[stage-2] 0090 | 6f 20 45 61 72 74 68 21 00 00 00 00 | o Earth!.... +[stage-2] 0080 | 00 00 00 00 00 01 40 00 00 00 01 34 48 65 6c 6c | ......@....4Hell +[stage-2] 0090 | 6f 20 52 65 76 65 72 73 65 20 45 6e 67 69 6e 65 | o Reverse Engine +[stage-2] 00a0 | 65 72 69 6e 67 21 00 00 00 00 | ering!.... [stage-2]  [stage-2] [Decoder] - .ResponseHeader [stage-2] [Decoder]  - .correlation_id (1893237013) @@ -623,10 +624,10 @@ Debug = true [stage-2] [Decoder] - .ResponseBody [stage-2] [Decoder]  - .throttle_time_ms (0) [stage-2] [Decoder]  - .error_code (0) -[stage-2] [Decoder]  - .session_id (9620569) +[stage-2] [Decoder]  - .session_id (160653835) [stage-2] [Decoder]  - .num_responses (1) [stage-2] [Decoder]  - .TopicResponse[0] -[stage-2] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000026) +[stage-2] [Decoder]  - .topic_id (00000000-0000-4000-8000-000000000020) [stage-2] [Decoder]  - .num_partitions (1) [stage-2] [Decoder]  - .PartitionResponse[0] [stage-2] [Decoder]  - .partition_index (0) @@ -636,13 +637,13 @@ Debug = true [stage-2] [Decoder]  - .log_start_offset (0) [stage-2] [Decoder]  - .num_aborted_transactions (0) [stage-2] [Decoder]  - .preferred_read_replica (-1) -[stage-2] [Decoder]  - .compact_records_length (80) +[stage-2] [Decoder]  - .compact_records_length (94) [stage-2] [Decoder]  - .RecordBatch[0] [stage-2] [Decoder]  - .base_offset (0) -[stage-2] [Decoder]  - .batch_length (68) +[stage-2] [Decoder]  - .batch_length (82) [stage-2] [Decoder]  - .partition_leader_epoch (0) [stage-2] [Decoder]  - .magic_byte (2) -[stage-2] [Decoder]  - .crc (1684110410) +[stage-2] [Decoder]  - .crc (-1951758550) [stage-2] [Decoder]  - .record_attributes (0) [stage-2] [Decoder]  - .last_offset_delta (0) [stage-2] [Decoder]  - .base_timestamp (1726045973899) @@ -652,14 +653,14 @@ Debug = true [stage-2] [Decoder]  - .base_sequence (0) [stage-2] [Decoder]  - .num_records (1) [stage-2] [Decoder]  - .Record[0] -[stage-2] [Decoder]  - .length (18) +[stage-2] [Decoder]  - .length (32) [stage-2] [Decoder]  - .attributes (0) [stage-2] [Decoder]  - .timestamp_delta (0) [stage-2] [Decoder]  - .offset_delta (0) [stage-2] [Decoder]  - .key_length (-1) [stage-2] [Decoder]  - .key ("") -[stage-2] [Decoder]  - .value_length (12) -[stage-2] [Decoder]  - .value ("Hello Earth!") +[stage-2] [Decoder]  - .value_length (26) +[stage-2] [Decoder]  - .value ("Hello Reverse Engineering!") [stage-2] [Decoder]  - .num_headers (0) [stage-2] [Decoder]  - .TAG_BUFFER [stage-2] [Decoder]  - .TAG_BUFFER @@ -667,33 +668,33 @@ Debug = true [stage-2] ✓ Correlation ID: 1893237013 [stage-2] ✓ Throttle Time: 0 [stage-2] ✓ Error Code: 0 (NO_ERROR) -[stage-2]  ✓ TopicResponse[0] Topic UUID: 00000000-0000-4000-8000-000000000026 +[stage-2]  ✓ TopicResponse[0] Topic UUID: 00000000-0000-4000-8000-000000000020 [stage-2]  ✓ PartitionResponse[0] Error code: 0 (NO_ERROR) [stage-2]  ✓ PartitionResponse[0] Partition Index: 0 [stage-2]  ✓ RecordBatch[0] BaseOffset: 0 -[stage-2]  ✓ Record[0] Value: Hello Earth! +[stage-2]  ✓ Record[0] Value: Hello Reverse Engineering! [stage-2] ✓ RecordBatch bytes match with the contents on disk [stage-2] Test passed. [stage-2] Terminating program [stage-2] Program terminated successfully [stage-1] Running tests for Stage #1: fd8 -[stage-1] $ ./your_program.sh /tmp/server.properties [stage-1] [Serializer] Writing log files to: /tmp/kraft-combined-logs [stage-1] [Serializer]  - Wrote file to: /tmp/server.properties [stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/meta.properties [stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/.kafka_cleanshutdown [stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/partition.metadata [stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/partition.metadata -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-1/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/partition.metadata +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-1/partition.metadata [stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/partition.metadata [stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/bar-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/baz-0/00000000000000000000.log [stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-0/00000000000000000000.log -[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/foo-1/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-0/00000000000000000000.log +[stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/pax-1/00000000000000000000.log [stage-1] [Serializer]  - Wrote file to: /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log [stage-1] [Serializer] Finished writing log files to: /tmp/kraft-combined-logs +[stage-1] $ ./your_program.sh /tmp/server.properties [stage-1] Connecting to broker at: localhost:9092 [stage-1] Connection to broker at localhost:9092 successful [stage-1] Sending "Fetch" (version: 16) request (Correlation id: 1698452642) @@ -703,7 +704,7 @@ Debug = true [stage-1] 0000 | 00 00 00 60 00 01 00 10 65 3c 54 a2 00 09 6b 61 | ...`....e Topic ID: %s\n", response.Topics[0].Name, response.Topics[0].TopicID) -} - func EncodeDescribeTopicPartitionsRequest(request *DescribeTopicPartitionsRequest) []byte { encoder := realencoder.RealEncoder{} encoder.Init(make([]byte, 4096)) diff --git a/protocol/api/fetch.go b/protocol/api/fetch.go index 5b30a51..83ac5d8 100644 --- a/protocol/api/fetch.go +++ b/protocol/api/fetch.go @@ -30,7 +30,8 @@ func DecodeFetchHeader(response []byte, version int16, logger *logger.Logger) (* logger.Debugf("- .ResponseHeader") if err := responseHeader.DecodeV1(&decoder, logger, 1); err != nil { if decodingErr, ok := err.(*errors.PacketDecodingError); ok { - return nil, decodingErr.WithAddedContext("Response Header").WithAddedContext("Fetch Response v16") + detailedError := decodingErr.WithAddedContext("Response Header").WithAddedContext("Fetch Response v16") + return nil, decoder.FormatDetailedError(detailedError.Error()) } return nil, err }