Skip to content

Commit

Permalink
test: add JSON schema test to schema in multiple subjects test
Browse files Browse the repository at this point in the history
References #854
  • Loading branch information
jjaakola-aiven committed May 14, 2024
1 parent a8e3e4a commit 639092b
Showing 1 changed file with 123 additions and 46 deletions.
169 changes: 123 additions & 46 deletions tests/integration/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from attr import dataclass
from http import HTTPStatus
from karapace.client import Client
from karapace.kafka.producer import KafkaProducer
from karapace.rapu import is_success
from karapace.schema_registry_apis import SchemaErrorMessages
from karapace.schema_type import SchemaType
from karapace.utils import json_encode
from tests.base_testcase import BaseTestCase
from tests.integration.utils.cluster import RegistryDescription
from tests.integration.utils.kafka_server import KafkaServers
from tests.utils import (
Expand Down Expand Up @@ -1079,85 +1082,159 @@ async def assert_schema_versions_failed(client: Client, trail: str, schema_id: i
assert res.status_code == response_code


async def register_schema(registry_async_client: Client, trail, subject: str, schema_str: str) -> Tuple[int, int]:
async def register_schema(
registry_async_client: Client, trail: str, subject: str, schema_str: str, schema_type: SchemaType = SchemaType.AVRO
) -> Tuple[int, int]:
# Register to get the id
payload = {"schema": schema_str}
if schema_type == SchemaType.JSONSCHEMA:
payload["schemaType"] = "JSON"
elif schema_type == SchemaType.PROTOBUF:
payload["schemaType"] = "PROTO"
else:
pass
res = await registry_async_client.post(
f"subjects/{subject}/versions{trail}",
json={"schema": schema_str},
json=payload,
)
assert res.status_code == 200
schema_id = res.json()["id"]

# Get version
res = await registry_async_client.post(
f"subjects/{subject}{trail}",
json={"schema": schema_str},
)
res = await registry_async_client.post(f"subjects/{subject}{trail}", json=payload)
assert res.status_code == 200
assert res.json()["id"] == schema_id
return schema_id, res.json()["version"]


@pytest.mark.parametrize("trail", ["", "/"])
async def test_schema_versions_multiple_subjects_same_schema(registry_async_client: Client, trail: str) -> None:
@dataclass
class MultipleSubjectsSameSchemaTestCase(BaseTestCase):
test_name: str
schema: str
other_schema: str
schema_type: SchemaType


@pytest.mark.parametrize(
"testcase",
[
MultipleSubjectsSameSchemaTestCase(
test_name="Test same AVRO schema on multiple subjects",
schema=json.dumps(
{
"type": "record",
"name": "SimpleTestSchema",
"fields": [
{
"name": "f1",
"type": "string",
},
{
"name": "f2",
"type": "string",
},
],
},
),
other_schema=json.dumps(
{
"type": "record",
"name": "SimpleOtherTestSchema",
"fields": [
{
"name": "f1",
"type": "string",
},
],
},
),
schema_type=SchemaType.AVRO,
),
MultipleSubjectsSameSchemaTestCase(
test_name="Test same JSON schema on multiple subjects",
schema=json.dumps(
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://example.com/product.schema.json",
"title": "SimpleTest",
"description": "Test JSON schema",
"type": "object",
"properties": {
"f1": {
"type": "string",
},
"f2": {
"type": "string",
},
},
},
),
other_schema=json.dumps(
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://example.com/product.schema.json",
"title": "SimpleTestOtherSchema",
"description": "Test JSON schema",
"type": "object",
"properties": {
"other_schema_field": {
"type": "integer",
},
},
}
),
schema_type=SchemaType.JSONSCHEMA,
),
],
)
async def test_schema_versions_multiple_subjects_same_schema(
registry_async_client: Client,
testcase: MultipleSubjectsSameSchemaTestCase,
) -> None:
"""
Tests case where there are multiple subjects with the same schema.
The schema/versions endpoint returns all these subjects.
"""
subject_name_factory = create_subject_name_factory(f"test_schema_versions_multiple_subjects_same_schema-{trail}")
schema_name_factory = create_schema_name_factory(f"test_schema_versions_multiple_subjects_same_schema_{trail}")

schema_1 = {
"type": "record",
"name": schema_name_factory(),
"fields": [
{
"name": "f1",
"type": "string",
},
{
"name": "f2",
"type": "string",
},
],
}
schema_str_1 = json.dumps(schema_1)
schema_2 = {
"type": "record",
"name": schema_name_factory(),
"fields": [
{
"name": "f1",
"type": "string",
}
],
}
schema_str_2 = json.dumps(schema_2)
subject_name_factory = create_subject_name_factory(
f"test_schema_versions_multiple_subjects_same_schema-{testcase.schema_type}"
)

subject_1 = subject_name_factory()
schema_id_1, version_1 = await register_schema(registry_async_client, trail, subject_1, schema_str_1)
schema_id_1, version_1 = await register_schema(
registry_async_client, "", subject_1, testcase.schema, schema_type=testcase.schema_type
)
schema_1_versions = [(subject_1, version_1)]
await assert_schema_versions(registry_async_client, trail, schema_id_1, schema_1_versions)
await assert_schema_versions(registry_async_client, "", schema_id_1, schema_1_versions)

subject_2 = subject_name_factory()
schema_id_2, version_2 = await register_schema(registry_async_client, trail, subject_2, schema_str_1)
schema_id_2, version_2 = await register_schema(
registry_async_client, "", subject_2, testcase.schema, schema_type=testcase.schema_type
)
schema_1_versions = [(subject_1, version_1), (subject_2, version_2)]
assert schema_id_1 == schema_id_2
await assert_schema_versions(registry_async_client, trail, schema_id_1, schema_1_versions)
await assert_schema_versions(registry_async_client, "", schema_id_1, schema_1_versions)

subject_3 = subject_name_factory()
schema_id_3, version_3 = await register_schema(registry_async_client, trail, subject_3, schema_str_1)
schema_id_3, version_3 = await register_schema(
registry_async_client, "", subject_3, testcase.schema, schema_type=testcase.schema_type
)
schema_1_versions = [(subject_1, version_1), (subject_2, version_2), (subject_3, version_3)]
assert schema_id_1 == schema_id_3
await assert_schema_versions(registry_async_client, trail, schema_id_1, schema_1_versions)
await assert_schema_versions(registry_async_client, "", schema_id_1, schema_1_versions)

# subject_4 with different schema to check there are no side effects
subject_4 = subject_name_factory()
schema_id_4, version_4 = await register_schema(registry_async_client, trail, subject_4, schema_str_2)
schema_id_4, version_4 = await register_schema(
registry_async_client, "", subject_4, testcase.other_schema, schema_type=testcase.schema_type
)
schema_2_versions = [(subject_4, version_4)]
assert schema_id_1 != schema_id_4
await assert_schema_versions(registry_async_client, trail, schema_id_1, schema_1_versions)
await assert_schema_versions(registry_async_client, trail, schema_id_4, schema_2_versions)
await assert_schema_versions(registry_async_client, "", schema_id_1, schema_1_versions)
await assert_schema_versions(registry_async_client, "", schema_id_4, schema_2_versions)

res = await registry_async_client.get("subjects")
assert res.status_code == 200
assert res.json() == [subject_1, subject_2, subject_3, subject_4]


@pytest.mark.parametrize("trail", ["", "/"])
Expand Down

0 comments on commit 639092b

Please sign in to comment.