Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix random topic ID collisions #37

Merged
merged 4 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 0 additions & 57 deletions protocol/api/api_versions.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,13 @@
package kafkaapi

import (
"fmt"

realdecoder "github.com/codecrafters-io/kafka-tester/protocol/decoder"
realencoder "github.com/codecrafters-io/kafka-tester/protocol/encoder"

"github.com/codecrafters-io/kafka-tester/protocol"
"github.com/codecrafters-io/kafka-tester/protocol/errors"
"github.com/codecrafters-io/tester-utils/logger"
)

func GetAPIVersions(prettyPrint bool) {
broker := protocol.NewBroker("localhost:9092")
if err := broker.Connect(); err != nil {
panic(err)
}
defer broker.Close()

response, err := ApiVersions(broker, &ApiVersionsRequestBody{Version: 4, ClientSoftwareName: "kafka-cli", ClientSoftwareVersion: "0.1"})
if err != nil {
panic(err)
}

if prettyPrint {
PrintAPIVersions(response)
}
}

func EncodeApiVersionsRequest(request *ApiVersionsRequest) []byte {
encoder := realencoder.RealEncoder{}
encoder.Init(make([]byte, 4096))
Expand Down Expand Up @@ -88,40 +68,3 @@ func DecodeApiVersionsHeaderAndResponse(response []byte, version int16, logger *

return &responseHeader, &apiVersionsResponse, nil
}

// ApiVersions returns api version response or error
func ApiVersions(b *protocol.Broker, requestBody *ApiVersionsRequestBody) (*ApiVersionsResponse, error) {
header := RequestHeader{
ApiKey: 18,
ApiVersion: requestBody.Version,
CorrelationId: 0,
ClientId: requestBody.ClientSoftwareName,
}
request := ApiVersionsRequest{
Header: header,
Body: *requestBody,
}
message := EncodeApiVersionsRequest(&request)

response, err := b.SendAndReceive(message)
if err != nil {
return nil, err
}

_, apiVersionsResponse, err := DecodeApiVersionsHeaderAndResponse(response.Payload, requestBody.Version, logger.GetLogger(true, ""))
if err != nil {
return nil, err
}

return apiVersionsResponse, nil
}

func PrintAPIVersions(response *ApiVersionsResponse) {
fmt.Printf("API versions supported by the broker are:\n")
fmt.Println("API Key\tMinVersion\tMaxVersion\t")
apiVersionKeys := response.ApiKeys
// For each API, the broker will return the minimum and maximum supported version
for _, key := range apiVersionKeys {
fmt.Println(key.ApiKey, "\t", key.MinVersion, "\t", key.MaxVersion)
}
}
19 changes: 0 additions & 19 deletions protocol/api/describe_topic_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,6 @@ func EncodeDescribeTopicPartitionsRequest(request *DescribeTopicPartitionsReques
return messageBytes
}

func DecodeDescribeTopicPartitionHeader(response []byte, version int16, logger *logger.Logger) (*ResponseHeader, error) {
decoder := realdecoder.RealDecoder{}
decoder.Init(response)
logger.UpdateSecondaryPrefix("Decoder")
defer logger.ResetSecondaryPrefix()

responseHeader := ResponseHeader{}
logger.Debugf("- .ResponseHeader")
// DescribeTopicPartitions always uses Header v0
if err := responseHeader.DecodeV0(&decoder, logger, 1); err != nil {
if decodingErr, ok := err.(*errors.PacketDecodingError); ok {
return nil, decodingErr.WithAddedContext("Response Header").WithAddedContext("DescribeTopicPartitions v0")
}
return nil, err
}

return &responseHeader, nil
}

// DecodeDescribeTopicPartitionsHeaderAndResponse decodes the header and response
// If an error is encountered while decoding, the returned objects are nil
func DecodeDescribeTopicPartitionsHeaderAndResponse(response []byte, logger *logger.Logger) (*ResponseHeader, *DescribeTopicPartitionsResponse, error) {
Expand Down
79 changes: 0 additions & 79 deletions protocol/api/fetch.go
Original file line number Diff line number Diff line change
@@ -1,53 +1,13 @@
package kafkaapi

import (
"fmt"

realdecoder "github.com/codecrafters-io/kafka-tester/protocol/decoder"
realencoder "github.com/codecrafters-io/kafka-tester/protocol/encoder"

"github.com/codecrafters-io/kafka-tester/protocol"
"github.com/codecrafters-io/kafka-tester/protocol/errors"
"github.com/codecrafters-io/tester-utils/logger"
)

func Fetch() {
broker := protocol.NewBroker("localhost:9092")
if err := broker.Connect(); err != nil {
panic(err)
}
defer broker.Close()

_, err := fetch(broker, &FetchRequestBody{
MaxWaitMS: 500,
MinBytes: 1,
MaxBytes: 52428800,
IsolationLevel: 0,
FetchSessionID: 0,
FetchSessionEpoch: 0,
Topics: []Topic{
{
TopicUUID: "0f62a58e-617b-462f-9161-132a1946d66a",
Partitions: []Partition{
{
ID: 0,
CurrentLeaderEpoch: 0,
FetchOffset: 0,
LastFetchedOffset: -1,
LogStartOffset: -1,
PartitionMaxBytes: 1048576,
},
},
},
},
ForgottenTopics: []ForgottenTopic{},
RackID: "",
}, logger.GetLogger(true, ""))
if err != nil {
panic(err)
}
}

func EncodeFetchRequest(request *FetchRequest) []byte {
encoder := realencoder.RealEncoder{}
// bytes.Buffer{}
Expand Down Expand Up @@ -106,42 +66,3 @@ func DecodeFetchHeaderAndResponse(response []byte, version int16, logger *logger

return &responseHeader, &fetchResponse, nil
}

// Fetch returns api version response or error
func fetch(b *protocol.Broker, requestBody *FetchRequestBody, logger *logger.Logger) ([]byte, error) {
request := FetchRequest{
Header: RequestHeader{
ApiKey: 1,
ApiVersion: 16,
CorrelationId: 0,
ClientId: "kafka-tester",
},
Body: *requestBody,
}

message := EncodeFetchRequest(&request)

response, err := b.SendAndReceive(message)
if err != nil {
return nil, err
}

protocol.PrintHexdump(response.RawBytes)

_, fetchResponse, err := DecodeFetchHeaderAndResponse(response.Payload, 16, logger)
if err != nil {
return nil, err
}

for _, topicResponse := range fetchResponse.TopicResponses {
for _, partitionResponse := range topicResponse.PartitionResponses {
for _, recordBatch := range partitionResponse.RecordBatches {
for _, r := range recordBatch.Records {
fmt.Printf("message: %s\n", r.Value)
}
}
}
}

return nil, nil
}
32 changes: 23 additions & 9 deletions protocol/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"fmt"
"slices"
"sort"

"github.com/codecrafters-io/tester-utils/random"
Expand All @@ -20,15 +21,16 @@ const (
)

var (
all_topic_names = []string{"foo", "bar", "baz", "qux", "quz", "pax", "paz", "saz"}
topic_names = GetSortedValues(random.RandomElementsFromArray(all_topic_names, 4))
TOPIC1_NAME = topic_names[0]
TOPIC2_NAME = topic_names[1]
TOPIC3_NAME = topic_names[2]
TOPIC1_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random.RandomInt(10, 99))
TOPIC2_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random.RandomInt(10, 99))
TOPIC3_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random.RandomInt(10, 99))
TOPICX_UUID = fmt.Sprintf("00000000-0000-0000-0000-00000000%04d", random.RandomInt(1000, 9999)) // Unknown topic used in requests
all_topic_names = []string{"foo", "bar", "baz", "qux", "quz", "pax", "paz", "saz"}
topic_names = GetSortedValues(random.RandomElementsFromArray(all_topic_names, 4))
TOPIC1_NAME = topic_names[0]
TOPIC2_NAME = topic_names[1]
TOPIC3_NAME = topic_names[2]
random_topic_uuids = getUniqueRandomIntegers(10, 99, 3)
TOPIC1_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random_topic_uuids[0])
TOPIC2_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random_topic_uuids[1])
TOPIC3_UUID = fmt.Sprintf("00000000-0000-4000-8000-0000000000%02d", random_topic_uuids[2])
TOPICX_UUID = fmt.Sprintf("00000000-0000-0000-0000-00000000%04d", random.RandomInt(1000, 9999)) // Unknown topic used in requests

TOPIC_UNKOWN_NAME = fmt.Sprintf("unknown-topic-%s", topic_names[3])
TOPIC_UNKOWN_UUID = "00000000-0000-0000-0000-000000000000"
Expand All @@ -46,3 +48,15 @@ func GetSortedValues[T string](values []T) []T {
})
return values
}

func getUniqueRandomIntegers(min, max, count int) []int {
randomInts := []int{}
for i := 0; i < count; i++ {
randomInt := random.RandomInt(min, max)
for slices.Contains(randomInts, randomInt) {
randomInt = random.RandomInt(min, max)
}
randomInts = append(randomInts, randomInt)
}
return randomInts
}
Loading