Skip to content

Commit

Permalink
Merge branch 'main' into update-cluster-payload-interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-gang authored Oct 28, 2024
2 parents 4d60f38 + 586d4dd commit a5e8a13
Show file tree
Hide file tree
Showing 65 changed files with 6,617 additions and 3,029 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v1
with:
python-version: '3.8'
python-version: '3.9'

- run: make test

Expand All @@ -35,7 +35,7 @@ jobs:
uses: actions/checkout@v2

- name: Set up Go
uses: actions/setup-go@v1
uses: actions/setup-go@v5
with:
go-version: 1.22.x

Expand Down
11 changes: 3 additions & 8 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,9 @@ dist/
# misc files
protocol/decoder/error-codes.md
main.go
internal/metadata.go
internal/stage3_test.go
internal/stage2_test.go
protocol/decoder/todo
**/__pycache__/**
internal/test_helpers/ryan/**
internal/assertion.go
log
log2
**tmp**
file
internal/assertions/topic_assertion.go
protocol/api/spec.yml
protocol/api/backupHex.sh
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2024 CodeCrafters

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
25 changes: 21 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,33 @@ build:

test_base_with_kafka: build
CODECRAFTERS_REPOSITORY_DIR=./internal/test_helpers/pass_all \
CODECRAFTERS_TEST_CASES_JSON="[{\"slug\":\"vi6\",\"tester_log_prefix\":\"stage-1\",\"title\":\"Stage #1: Bind to a port\"}, {\"slug\":\"nv3\",\"tester_log_prefix\":\"stage-2\",\"title\":\"Stage #2: Hardcoded Correlation ID\"}, {\"slug\":\"wa6\",\"tester_log_prefix\":\"stage-3\",\"title\":\"Stage #3: Correlation ID\"}, {\"slug\":\"nc5\",\"tester_log_prefix\":\"stage-4\",\"title\":\"Stage #4: API Version Error Case\"}, {\"slug\":\"pv1\",\"tester_log_prefix\":\"stage-5\",\"title\":\"Stage #5: API Version\"}, {\"slug\":\"nh4\",\"tester_log_prefix\":\"stage-C1\",\"title\":\"Stage #C1: Multiple sequential requests from client\"}, {\"slug\":\"sk0\",\"tester_log_prefix\":\"stage-C2\",\"title\":\"Stage #C2: Multiple concurrent requests from client\"}, {\"slug\":\"xy3\",\"tester_log_prefix\":\"stage-D3\",\"title\":\"Stage #D3: Describe Topic Partitions\"}, {\"slug\":\"gs0\",\"tester_log_prefix\":\"stage-F1\",\"title\":\"Stage #F1: API Version with Fetch Key\"}, {\"slug\":\"dh6\",\"tester_log_prefix\":\"stage-F2\",\"title\":\"Stage #F2: Empty Fetch\"}, {\"slug\":\"hn6\",\"tester_log_prefix\":\"stage-F3\",\"title\":\"Stage #F3: Fetch with Empty Topic\"}, {\"slug\":\"cm4\",\"tester_log_prefix\":\"stage-F4\",\"title\":\"Stage #F4: Fetch Error Response\"}, {\"slug\":\"eg2\",\"tester_log_prefix\":\"stage-F5\",\"title\":\"Stage #F5: Single Fetch from Disk\"}, {\"slug\":\"fd8\",\"tester_log_prefix\":\"stage-F6\",\"title\":\"Stage #F6: Multi Fetch from Disk\"}]" \
CODECRAFTERS_TEST_CASES_JSON="[{\"slug\":\"vi6\",\"tester_log_prefix\":\"stage-1\",\"title\":\"Stage #1: Bind to a port\"}, {\"slug\":\"nv3\",\"tester_log_prefix\":\"stage-2\",\"title\":\"Stage #2: Hardcoded Correlation ID\"}, {\"slug\":\"wa6\",\"tester_log_prefix\":\"stage-3\",\"title\":\"Stage #3: Correlation ID\"}, {\"slug\":\"nc5\",\"tester_log_prefix\":\"stage-4\",\"title\":\"Stage #4: API Version Error Case\"}, {\"slug\":\"pv1\",\"tester_log_prefix\":\"stage-5\",\"title\":\"Stage #5: API Version\"}]" \
dist/main.out

test_75_with_kafka: build
test_describe_topic_partitions_with_kafka: build
CODECRAFTERS_REPOSITORY_DIR=./internal/test_helpers/pass_all \
CODECRAFTERS_TEST_CASES_JSON="[{\"slug\":\"xy3\",\"tester_log_prefix\":\"stage-D3\",\"title\":\"Stage #D3: Describe Topic Partitions\"}]" \
CODECRAFTERS_TEST_CASES_JSON="[{\"slug\":\"yk1\",\"tester_log_prefix\":\"stage-P1\",\"title\":\"Stage #P1: API Version with Describe Topic Partitions\"}, {\"slug\":\"vt6\",\"tester_log_prefix\":\"stage-P2\",\"title\":\"Stage #P2: Describe Topic Partitions for unknown topic\"}, {\"slug\":\"ea7\",\"tester_log_prefix\":\"stage-P3\",\"title\":\"Stage #P3: Describe Topic Partitions\"}, {\"slug\":\"ku4\",\"tester_log_prefix\":\"stage-P4\",\"title\":\"Stage #P4: Describe Topic Partitions w Multi partition\"}, {\"slug\":\"wq2\",\"tester_log_prefix\":\"stage-P5\",\"title\":\"Stage #P5: Describe Topic Partitions 2\"}]" \
dist/main.out

test_concurrent_requests_with_kafka: build
CODECRAFTERS_REPOSITORY_DIR=./internal/test_helpers/pass_all \
CODECRAFTERS_TEST_CASES_JSON="[{\"slug\":\"nh4\",\"tester_log_prefix\":\"stage-C1\",\"title\":\"Stage #C1: Multiple sequential requests from client\"}, {\"slug\":\"sk0\",\"tester_log_prefix\":\"stage-C2\",\"title\":\"Stage #C2: Multiple concurrent requests from client\"}]" \
dist/main.out


test_all:
make test_base_with_kafka
make test_concurrent_requests_with_kafka
make test_describe_topic_partitions_with_kafka
make test_fetch_with_kafka

test_fetch_with_kafka: build
CODECRAFTERS_REPOSITORY_DIR=./internal/test_helpers/pass_all \
CODECRAFTERS_TEST_CASES_JSON="[{\"slug\":\"gs0\",\"tester_log_prefix\":\"stage-F1\",\"title\":\"Stage #F1: API Version with Fetch Key\"}, {\"slug\":\"dh6\",\"tester_log_prefix\":\"stage-F2\",\"title\":\"Stage #F2: Fetch with no topics\"}, {\"slug\":\"hn6\",\"tester_log_prefix\":\"stage-F3\",\"title\":\"Stage #F3: Fetch with unknown topic\"}, {\"slug\":\"cm4\",\"tester_log_prefix\":\"stage-F4\",\"title\":\"Stage #F4: Fetch with empty topic\"}, {\"slug\":\"eg2\",\"tester_log_prefix\":\"stage-F5\",\"title\":\"Stage #F5: Single Fetch from Disk\"}, {\"slug\":\"fd8\",\"tester_log_prefix\":\"stage-F6\",\"title\":\"Stage #F6: Multi Fetch from Disk\"}]" \
dist/main.out

test:
TESTER_DIR=$(shell pwd) go test -v ./internal/
TESTER_DIR=$(shell pwd) go test -v ./internal/ -failfast --count=1

test_and_watch:
onchange '**/*' -- go test -v ./internal/
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module github.com/codecrafters-io/kafka-tester
go 1.22.1

require (
github.com/codecrafters-io/tester-utils v0.2.37
github.com/codecrafters-io/tester-utils v0.2.38
github.com/google/uuid v1.6.0
github.com/stretchr/testify v1.9.0
)

Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
github.com/codecrafters-io/tester-utils v0.2.37 h1:VZ6dJ8HdaYMqShw8+cuBv0N6cATaEc6NSKLoSlp1u0o=
github.com/codecrafters-io/tester-utils v0.2.37/go.mod h1:VgP0WmmRsA8L1urWGMXPW4Zv5jcwHwR0LdKZ8ZAEFT4=
github.com/codecrafters-io/tester-utils v0.2.38 h1:rKxm02A4Ll6gGrwcsGKDHxZExjH4hMbtu2ra7r/E7Lk=
github.com/codecrafters-io/tester-utils v0.2.38/go.mod h1:VgP0WmmRsA8L1urWGMXPW4Zv5jcwHwR0LdKZ8ZAEFT4=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4=
github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand Down
79 changes: 79 additions & 0 deletions internal/assertions/apiversions_response_assertion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package assertions

import (
"fmt"

kafkaapi "github.com/codecrafters-io/kafka-tester/protocol/api"
"github.com/codecrafters-io/tester-utils/logger"
)

type ApiVersionsResponseAssertion struct {
ActualValue kafkaapi.ApiVersionsResponse
ExpectedValue kafkaapi.ApiVersionsResponse
}

func NewApiVersionsResponseAssertion(actualValue kafkaapi.ApiVersionsResponse, expectedValue kafkaapi.ApiVersionsResponse) ApiVersionsResponseAssertion {
return ApiVersionsResponseAssertion{ActualValue: actualValue, ExpectedValue: expectedValue}
}

var apiKeyNames = map[int16]string{
1: "FETCH",
18: "API_VERSIONS",
75: "DESCRIBE_TOPIC_PARTITIONS",
}

var errorCodes = map[int]string{
0: "NO_ERROR",
3: "UNKNOWN_TOPIC_OR_PARTITION",
35: "UNSUPPORTED_VERSION",
100: "UNKNOWN_TOPIC_ID",
}

func (a ApiVersionsResponseAssertion) Evaluate(fields []string, AssertApiVersionsResponseKey bool, logger *logger.Logger) error {
if Contains(fields, "ErrorCode") {
if a.ActualValue.ErrorCode != a.ExpectedValue.ErrorCode {
return fmt.Errorf("Expected %s to be %d, got %d", "ErrorCode", a.ExpectedValue.ErrorCode, a.ActualValue.ErrorCode)
}

errorCodeName, ok := errorCodes[int(a.ActualValue.ErrorCode)]
if !ok {
errorCodeName = "UNKNOWN"
}
logger.Successf("✓ Error code: %d (%s)", a.ActualValue.ErrorCode, errorCodeName)
}

if AssertApiVersionsResponseKey {
if len(a.ActualValue.ApiKeys) < len(a.ExpectedValue.ApiKeys) {
return fmt.Errorf("Expected API keys array to include atleast %d keys, got %d", len(a.ExpectedValue.ApiKeys), len(a.ActualValue.ApiKeys))
}
logger.Successf("✓ API keys array length: %d", len(a.ActualValue.ApiKeys))

for _, expectedApiVersionKey := range a.ExpectedValue.ApiKeys {
found := false
for _, actualApiVersionKey := range a.ActualValue.ApiKeys {
if actualApiVersionKey.ApiKey == expectedApiVersionKey.ApiKey {
found = true
if actualApiVersionKey.MinVersion > expectedApiVersionKey.MaxVersion {
return fmt.Errorf("Expected min version %v to be < max version %v for %s", actualApiVersionKey.MinVersion, expectedApiVersionKey.MaxVersion, apiKeyNames[expectedApiVersionKey.ApiKey])
}

// anything above or equal to expected minVersion is fine
if actualApiVersionKey.MinVersion < expectedApiVersionKey.MinVersion {
return fmt.Errorf("Expected API version %v to be supported for %s, got %v", expectedApiVersionKey.MaxVersion, apiKeyNames[expectedApiVersionKey.ApiKey], actualApiVersionKey.MaxVersion)
}
logger.Successf("✓ MinVersion for %s is <= %v & >= %v", apiKeyNames[expectedApiVersionKey.ApiKey], expectedApiVersionKey.MaxVersion, expectedApiVersionKey.MinVersion)

if actualApiVersionKey.MaxVersion < expectedApiVersionKey.MaxVersion {
return fmt.Errorf("Expected API version %v to be supported for %s, got %v", expectedApiVersionKey.MaxVersion, apiKeyNames[expectedApiVersionKey.ApiKey], actualApiVersionKey.MaxVersion)
}
logger.Successf("✓ MaxVersion for %s is >= %v", apiKeyNames[expectedApiVersionKey.ApiKey], expectedApiVersionKey.MaxVersion)
}
}
if !found {
return fmt.Errorf("Expected APIVersionsResponseKey array to include API key %d (%s)", expectedApiVersionKey.ApiKey, apiKeyNames[expectedApiVersionKey.ApiKey])
}
}
}

return nil
}
136 changes: 136 additions & 0 deletions internal/assertions/describe_topic_partitions_response_assertion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package assertions

import (
"fmt"

"github.com/codecrafters-io/kafka-tester/protocol"
kafkaapi "github.com/codecrafters-io/kafka-tester/protocol/api"
"github.com/codecrafters-io/tester-utils/logger"
)

type DescribeTopicPartitionsResponseAssertion struct {
ActualValue kafkaapi.DescribeTopicPartitionsResponse
ExpectedValue kafkaapi.DescribeTopicPartitionsResponse
logger *logger.Logger
err error
}

func NewDescribeTopicPartitionsResponseAssertion(actualValue kafkaapi.DescribeTopicPartitionsResponse, expectedValue kafkaapi.DescribeTopicPartitionsResponse, logger *logger.Logger) *DescribeTopicPartitionsResponseAssertion {
return &DescribeTopicPartitionsResponseAssertion{
ActualValue: actualValue,
ExpectedValue: expectedValue,
logger: logger,
}
}

func (a *DescribeTopicPartitionsResponseAssertion) AssertBody(fields []string) *DescribeTopicPartitionsResponseAssertion {
if a.err != nil {
return a
}
if Contains(fields, "ThrottleTimeMs") {
if a.ActualValue.ThrottleTimeMs != a.ExpectedValue.ThrottleTimeMs {
a.err = fmt.Errorf("Expected %s to be %d, got %d", "ThrottleTimeMs", a.ExpectedValue.ThrottleTimeMs, a.ActualValue.ThrottleTimeMs)
return a
}
a.logger.Successf("✓ Throttle Time: %d", a.ActualValue.ThrottleTimeMs)
}

return a
}

func (a *DescribeTopicPartitionsResponseAssertion) AssertTopics(topicFields []string, partitionFields []string) *DescribeTopicPartitionsResponseAssertion {
if a.err != nil {
return a
}

if len(a.ActualValue.Topics) != len(a.ExpectedValue.Topics) {
a.err = fmt.Errorf("Expected %s to be %d, got %d", "topics.length", len(a.ExpectedValue.Topics), len(a.ActualValue.Topics))
return a
}

for i, actualTopic := range a.ActualValue.Topics {
expectedTopic := a.ExpectedValue.Topics[i]
if Contains(topicFields, "ErrorCode") {
if actualTopic.ErrorCode != expectedTopic.ErrorCode {
a.err = fmt.Errorf("Expected %s to be %d, got %d", fmt.Sprintf("TopicResponse[%d] Error Code", i), expectedTopic.ErrorCode, actualTopic.ErrorCode)
return a
}
protocol.SuccessLogWithIndentation(a.logger, 1, "✓ TopicResponse[%d] Error code: %d", i, actualTopic.ErrorCode)
}

if Contains(topicFields, "Name") {
if actualTopic.Name != expectedTopic.Name {
a.err = fmt.Errorf("Expected %s to be %s, got %s", fmt.Sprintf("TopicResponse[%d] Topic Name", i), expectedTopic.Name, actualTopic.Name)
return a
}
protocol.SuccessLogWithIndentation(a.logger, 1, "✓ TopicResponse[%d] Topic Name: %s", i, actualTopic.Name)
}

if Contains(topicFields, "TopicID") {
if actualTopic.TopicID != expectedTopic.TopicID {
a.err = fmt.Errorf("Expected %s to be %s, got %s", fmt.Sprintf("TopicResponse[%d] Topic UUID", i), expectedTopic.TopicID, actualTopic.TopicID)
return a
}
protocol.SuccessLogWithIndentation(a.logger, 1, "✓ TopicResponse[%d] Topic UUID: %s", i, actualTopic.TopicID)
}

if Contains(topicFields, "TopicAuthorizedOperations") {
if actualTopic.TopicAuthorizedOperations != expectedTopic.TopicAuthorizedOperations {
a.err = fmt.Errorf("Expected %s to be %d, got %d", fmt.Sprintf("TopicResponse[%d] Topic Authorized Operations", i), expectedTopic.TopicAuthorizedOperations, actualTopic.TopicAuthorizedOperations)
return a
}
protocol.SuccessLogWithIndentation(a.logger, 1, "✓ TopicResponse[%d] Topic Authorized Operations: %d", i, actualTopic.TopicAuthorizedOperations)
}

expectedPartitions := expectedTopic.Partitions
actualPartitions := actualTopic.Partitions

if (partitionFields) != nil {
a.assertPartitions(expectedPartitions, actualPartitions, partitionFields)
} else {
if len(actualPartitions) != 0 {
a.err = fmt.Errorf("Expected %s to be %d, got %d", "partitions.length", 0, len(actualPartitions))
return a
}
}
}

return a
}

func (a *DescribeTopicPartitionsResponseAssertion) assertPartitions(expectedPartitions []kafkaapi.DescribeTopicPartitionsResponsePartition, actualPartitions []kafkaapi.DescribeTopicPartitionsResponsePartition, fields []string) *DescribeTopicPartitionsResponseAssertion {
if len(actualPartitions) != len(expectedPartitions) {
a.err = fmt.Errorf("Expected %s to be %d, got %d", "partitions.length", len(expectedPartitions), len(actualPartitions))
return a
}

for j, actualPartition := range actualPartitions {
expectedPartition := expectedPartitions[j]

if Contains(fields, "ErrorCode") {
if actualPartition.ErrorCode != expectedPartition.ErrorCode {
a.err = fmt.Errorf("Expected %s to be %d, got %d", fmt.Sprintf("PartitionResponse[%d] Error Code", j), expectedPartition.ErrorCode, actualPartition.ErrorCode)
return a
}
protocol.SuccessLogWithIndentation(a.logger, 2, "✓ PartitionResponse[%d] Error code: %d", j, actualPartition.ErrorCode)
}

if Contains(fields, "PartitionIndex") {
if actualPartition.PartitionIndex != expectedPartition.PartitionIndex {
a.err = fmt.Errorf("Expected %s to be %d, got %d", fmt.Sprintf("Partition Response[%d] Partition Index", j), expectedPartition.PartitionIndex, actualPartition.PartitionIndex)
return a
}
protocol.SuccessLogWithIndentation(a.logger, 2, "✓ PartitionResponse[%d] Partition Index: %d", j, actualPartition.PartitionIndex)
}

}

return nil
}

func (a DescribeTopicPartitionsResponseAssertion) Run() error {
// firstLevelFields: ["ThrottleTimeMs"]
// secondLevelFields (Topics): ["ErrorCode", "Name", "TopicID", "TopicAuthorizedOperations"]
// thirdLevelFields (Partitions): ["ErrorCode, "PartitionIndex"]
return a.err
}
Loading

0 comments on commit a5e8a13

Please sign in to comment.