Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor cluster metadata serialization and add custom byte matching logic #33

Merged
merged 12 commits into from
Oct 24, 2024
Merged
7 changes: 1 addition & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ test_concurrent_requests_with_kafka: build
dist/main.out


test_current:
test_all:
make test_base_with_kafka
make test_concurrent_requests_with_kafka
make test_describe_topic_partitions_with_kafka
@@ -37,11 +37,6 @@ test_fetch_with_kafka: build
CODECRAFTERS_TEST_CASES_JSON="[{\"slug\":\"gs0\",\"tester_log_prefix\":\"stage-F1\",\"title\":\"Stage #F1: API Version with Fetch Key\"}, {\"slug\":\"dh6\",\"tester_log_prefix\":\"stage-F2\",\"title\":\"Stage #F2: Fetch with no topics\"}, {\"slug\":\"hn6\",\"tester_log_prefix\":\"stage-F3\",\"title\":\"Stage #F3: Fetch with unknown topic\"}, {\"slug\":\"cm4\",\"tester_log_prefix\":\"stage-F4\",\"title\":\"Stage #F4: Fetch with empty topic\"}, {\"slug\":\"eg2\",\"tester_log_prefix\":\"stage-F5\",\"title\":\"Stage #F5: Single Fetch from Disk\"}, {\"slug\":\"fd8\",\"tester_log_prefix\":\"stage-F6\",\"title\":\"Stage #F6: Multi Fetch from Disk\"}]" \
dist/main.out

test_75_with_kafka: build
CODECRAFTERS_REPOSITORY_DIR=./internal/test_helpers/pass_all \
CODECRAFTERS_TEST_CASES_JSON="[{\"slug\":\"fd8\",\"tester_log_prefix\":\"stage-F6\",\"title\":\"Stage #F6: Multi Fetch from Disk\"}]" \
dist/main.out

test:
TESTER_DIR=$(shell pwd) go test -v ./internal/ -failfast --count=1

54 changes: 54 additions & 0 deletions internal/assertions/fetch_response_assertion.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,8 @@ import (

"github.com/codecrafters-io/kafka-tester/protocol"
kafkaapi "github.com/codecrafters-io/kafka-tester/protocol/api"
realencoder "github.com/codecrafters-io/kafka-tester/protocol/encoder"
"github.com/codecrafters-io/tester-utils/bytes_diff_visualizer"
"github.com/codecrafters-io/tester-utils/logger"
)

@@ -220,6 +222,46 @@ func (a *FetchResponseAssertion) assertRecords(expectedRecords []kafkaapi.Record
return a
}

func (a *FetchResponseAssertion) AssertRecordBatchBytes() *FetchResponseAssertion {
if a.err != nil {
return a
}

actualRecordBatches := []kafkaapi.RecordBatch{}
for _, topic := range a.ActualValue.TopicResponses {
for _, partition := range topic.PartitionResponses {
actualRecordBatches = append(actualRecordBatches, partition.RecordBatches...)
}
}

expectedRecordBatches := []kafkaapi.RecordBatch{}
for _, topic := range a.ExpectedValue.TopicResponses {
for _, partition := range topic.PartitionResponses {
expectedRecordBatches = append(expectedRecordBatches, partition.RecordBatches...)
}
}

expectedRecordBatchBytes := encodeRecordBatches(expectedRecordBatches)
actualRecordBatchBytes := encodeRecordBatches(actualRecordBatches)
// Byte Comparison for expected v actual RecordBatch bytes
// As we write them to disk, and expect users to not change the values
// we can use a simple byte comparison here.
if !bytes.Equal(expectedRecordBatchBytes, actualRecordBatchBytes) {
result := bytes_diff_visualizer.VisualizeByteDiff(expectedRecordBatchBytes, actualRecordBatchBytes)
a.logger.Errorf("")
for _, line := range result {
a.logger.Errorf(line)
}
a.logger.Errorf("")
a.err = fmt.Errorf("RecordBatch bytes do not match with the contents on disk")
return a
}

a.logger.Successf("✓ RecordBatch bytes match with the contents on disk")
return a

}

func (a FetchResponseAssertion) Run() error {
// firstLevelFields: ["ThrottleTimeMs", "ErrorCode", "SessionID"]
// secondLevelFields (Topics): ["Topic"]
@@ -228,3 +270,15 @@ func (a FetchResponseAssertion) Run() error {
// fifthLevelFields (Records): ["Value"]
return a.err
}

func encodeRecordBatches(recordBatches []kafkaapi.RecordBatch) []byte {
// Given an array of RecordBatch, encodes them using the encoder.RealEncoder
// and returns the resulting bytes.

encoder := realencoder.RealEncoder{}
encoder.Init(make([]byte, 4096))
for _, recordBatch := range recordBatches {
recordBatch.Encode(&encoder)
}
return encoder.Bytes()[:encoder.Offset()]
}
10 changes: 4 additions & 6 deletions internal/stagef5.go
Original file line number Diff line number Diff line change
@@ -109,8 +109,8 @@ func testFetchWithSingleMessage(stageHarness *test_case_harness.TestCaseHarness)
Magic: 0,
Attributes: 0,
LastOffsetDelta: 0,
FirstTimestamp: 0,
MaxTimestamp: 0,
FirstTimestamp: 1726045973899,
MaxTimestamp: 1726045973899,
ProducerId: 0,
ProducerEpoch: 0,
BaseSequence: 0,
@@ -120,7 +120,7 @@ func testFetchWithSingleMessage(stageHarness *test_case_harness.TestCaseHarness)
Attributes: 0,
TimestampDelta: 0,
OffsetDelta: 0,
Key: []byte{},
Key: nil,
Value: []byte(common.MESSAGE1),
Headers: []kafkaapi.RecordHeader{},
},
@@ -134,11 +134,9 @@ func testFetchWithSingleMessage(stageHarness *test_case_harness.TestCaseHarness)
},
}

// Byte to Byte comparison for RecordBatches
// Wire up ByteDiffVisualizer

return assertions.NewFetchResponseAssertion(*responseBody, expectedFetchResponse, logger).
AssertBody([]string{"ThrottleTimeMs", "ErrorCode"}).
AssertTopics([]string{"Topic"}, []string{"ErrorCode", "PartitionIndex"}, []string{"BaseOffset"}, []string{"Value"}).
AssertRecordBatchBytes().
Run()
}
9 changes: 5 additions & 4 deletions internal/stagef6.go
Original file line number Diff line number Diff line change
@@ -109,8 +109,8 @@ func testFetchMultipleMessages(stageHarness *test_case_harness.TestCaseHarness)
Magic: 0,
Attributes: 0,
LastOffsetDelta: 0,
FirstTimestamp: 0,
MaxTimestamp: 0,
FirstTimestamp: 1726045973899,
MaxTimestamp: 1726045973899,
ProducerId: 0,
ProducerEpoch: 0,
BaseSequence: 0,
@@ -133,8 +133,8 @@ func testFetchMultipleMessages(stageHarness *test_case_harness.TestCaseHarness)
Magic: 0,
Attributes: 0,
LastOffsetDelta: 0,
FirstTimestamp: 0,
MaxTimestamp: 0,
FirstTimestamp: 1726045973899,
MaxTimestamp: 1726045973899,
ProducerId: 0,
ProducerEpoch: 0,
BaseSequence: 0,
@@ -161,5 +161,6 @@ func testFetchMultipleMessages(stageHarness *test_case_harness.TestCaseHarness)
return assertions.NewFetchResponseAssertion(*responseBody, expectedFetchResponse, logger).
AssertBody([]string{"ThrottleTimeMs", "ErrorCode"}).
AssertTopics([]string{"Topic"}, []string{"ErrorCode", "PartitionIndex"}, []string{"BaseOffset"}, []string{"Value"}).
AssertRecordBatchBytes().
Run()
}
6 changes: 4 additions & 2 deletions internal/stages_test.go
Original file line number Diff line number Diff line change
@@ -34,7 +34,7 @@ func TestStages(t *testing.T) {
NormalizeOutputFunc: normalizeTesterOutput,
},
"fetch_pass": {
StageSlugs: []string{"gs0", "dh6", "hn6", "cm4"},
StageSlugs: []string{"gs0", "dh6", "hn6", "cm4", "eg2", "fd8"},
CodePath: "./test_helpers/pass_all",
ExpectedExitCode: 0,
StdoutFixturePath: "./test_helpers/fixtures/fetch/pass",
@@ -60,13 +60,15 @@ func normalizeTesterOutput(testerOutput []byte) []byte {
"Name": {regexp.MustCompile(`✓ Topic Name: [0-9A-Za-z]{3}`)},
"UUID": {regexp.MustCompile(`✓ Topic UUID: [0-9]{8}-[0-9]{4}-[0-9]{4}-[0-9]{4}-[0-9]{12}`)},
"value_length": {regexp.MustCompile(`- .value_length \([0-9]{1,}\)`)},
"value": {regexp.MustCompile(`- .value \("[A-Za-z0-9 !]{1,}"\)`)},
"value": {regexp.MustCompile(`- .[vV]alue \("[A-Za-z0-9 !]{1,}"\)`)},
"name": {regexp.MustCompile(`- .name \([A-Za-z -]{1,}\)`)},
"topic_name": {regexp.MustCompile(`- .topic_name \([A-Za-z0-9 ]{1,}\)`)},
"next_cursor": {regexp.MustCompile(`- .next_cursor \(\{[A-Za-z0-9 ]{1,}\}\)`)},
"Messages": {regexp.MustCompile(`✓ Messages: \["[A-Za-z !]{1,}"\]`)},
"Topic Name": {regexp.MustCompile(`✓ TopicResponse\[[0-9]{1,}\] Topic Name: [A-Za-z -]{3,}`)},
"Topic UUID": {regexp.MustCompile(`✓ TopicResponse\[[0-9]{1,}\] Topic UUID: [0-9 -]{1,}`)},
"Record Value": {regexp.MustCompile(`✓ Record\[[0-9]{1,}\] Value: [A-Za-z0-9 !]{1,}`)},
"RecordBatch BaseOffset": {regexp.MustCompile(`✓ RecordBatch\[[0-9]{1,}\] BaseOffset: [0-9]{1,}`)},
}

for replacement, regexes := range replacements {
Loading