diff --git a/internal/stage5.go b/internal/stage5.go index 198f1b1..2b73f2c 100644 --- a/internal/stage5.go +++ b/internal/stage5.go @@ -13,24 +13,25 @@ import ( func testAPIVersion(stageHarness *test_case_harness.TestCaseHarness) error { b := kafka_executable.NewKafkaExecutable(stageHarness) - if err := b.Run(); err != nil { + err := serializer.GenerateLogDirs(logger.GetQuietLogger(""), true) + if err != nil { return err } - quietLogger := logger.GetQuietLogger("") - logger := stageHarness.Logger - err := serializer.GenerateLogDirs(quietLogger, true) - if err != nil { + stageLogger := stageHarness.Logger + if err := b.Run(); err != nil { return err } correlationId := getRandomCorrelationId() broker := protocol.NewBroker("localhost:9092") - if err := broker.ConnectWithRetries(b, logger); err != nil { + if err := broker.ConnectWithRetries(b, stageLogger); err != nil { return err } - defer broker.Close() + defer func(broker *protocol.Broker) { + _ = broker.Close() + }(broker) request := kafkaapi.ApiVersionsRequest{ Header: kafkaapi.RequestHeader{ @@ -47,16 +48,16 @@ func testAPIVersion(stageHarness *test_case_harness.TestCaseHarness) error { } message := kafkaapi.EncodeApiVersionsRequest(&request) - logger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) - logger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) + stageLogger.Infof("Sending \"ApiVersions\" (version: %v) request (Correlation id: %v)", request.Header.ApiVersion, request.Header.CorrelationId) + stageLogger.Debugf("Hexdump of sent \"ApiVersions\" request: \n%v\n", GetFormattedHexdump(message)) response, err := broker.SendAndReceive(message) if err != nil { return err } - logger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) + stageLogger.Debugf("Hexdump of received \"ApiVersions\" response: \n%v\n", GetFormattedHexdump(response.RawBytes)) - responseHeader, responseBody, err := kafkaapi.DecodeApiVersionsHeaderAndResponse(response.Payload, 3, logger) + responseHeader, responseBody, err := kafkaapi.DecodeApiVersionsHeaderAndResponse(response.Payload, 3, stageLogger) if err != nil { return err } @@ -64,17 +65,17 @@ func testAPIVersion(stageHarness *test_case_harness.TestCaseHarness) error { if responseHeader.CorrelationId != correlationId { return fmt.Errorf("Expected Correlation ID to be %v, got %v", correlationId, responseHeader.CorrelationId) } - logger.Successf("✓ Correlation ID: %v", responseHeader.CorrelationId) + stageLogger.Successf("✓ Correlation ID: %v", responseHeader.CorrelationId) if responseBody.ErrorCode != 0 { return fmt.Errorf("Expected Error code to be 0, got %v", responseBody.ErrorCode) } - logger.Successf("✓ Error code: 0 (NO_ERROR)") + stageLogger.Successf("✓ Error code: 0 (NO_ERROR)") if len(responseBody.ApiKeys) < 1 { return fmt.Errorf("Expected API keys array to be non-empty") } - logger.Successf("✓ API keys array is non-empty") + stageLogger.Successf("✓ API keys array is non-empty") foundAPIKey := false MAX_VERSION := int16(4) @@ -82,7 +83,7 @@ func testAPIVersion(stageHarness *test_case_harness.TestCaseHarness) error { if apiVersionKey.ApiKey == 18 { foundAPIKey = true if apiVersionKey.MaxVersion >= MAX_VERSION { - logger.Successf("✓ API version %v is supported for API_VERSIONS", MAX_VERSION) + stageLogger.Successf("✓ API version %v is supported for API_VERSIONS", MAX_VERSION) } else { return fmt.Errorf("Expected API version %v to be supported for API_VERSIONS, got %v", MAX_VERSION, apiVersionKey.MaxVersion) }