From 3cf0f16c66867925ef312d2aafdb41a031bb955b Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Tue, 19 Nov 2024 08:40:27 +0100 Subject: [PATCH] added smoke tests against the new rest-proxy standalone module --- .coveragerc | 2 +- .github/workflows/container-smoke-test.yml | 3 + bin/smoke-test-rest-proxy.sh | 20 ++++ container/compose.yml | 28 ++++++ container/start.sh | 16 ++++ mypy.ini | 4 +- src/karapace/karapace_all.py | 2 +- .../__init__.py | 16 ++-- src/rest_proxy/__main__.py | 93 +++++++++++++++++++ .../authentication.py | 0 .../consumer_manager.py | 4 +- .../error_codes.py | 0 .../schema_cache.py | 0 tests/integration/conftest.py | 2 +- tests/integration/test_rest.py | 2 +- tests/integration/test_rest_consumer.py | 2 +- .../__init__.py | 0 .../test_rest_proxy_cluster_metadata_cache.py | 2 +- tests/unit/test_authentication.py | 4 +- tests/unit/test_rest_auth.py | 2 +- 20 files changed, 181 insertions(+), 21 deletions(-) create mode 100755 bin/smoke-test-rest-proxy.sh rename src/{karapace/kafka_rest_apis => rest_proxy}/__init__.py (99%) create mode 100644 src/rest_proxy/__main__.py rename src/{karapace/kafka_rest_apis => rest_proxy}/authentication.py (100%) rename src/{karapace/kafka_rest_apis => rest_proxy}/consumer_manager.py (99%) rename src/{karapace/kafka_rest_apis => rest_proxy}/error_codes.py (100%) rename src/{karapace/kafka_rest_apis => rest_proxy}/schema_cache.py (100%) rename tests/unit/{kafka_rest_apis => rest_proxy}/__init__.py (100%) rename tests/unit/{kafka_rest_apis => rest_proxy}/test_rest_proxy_cluster_metadata_cache.py (99%) diff --git a/.coveragerc b/.coveragerc index 2a6a5d055..2d1c0590c 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,4 +1,4 @@ [run] branch = true relative_files = true -source = src/karapace +source = src/karapace,src/rest_proxy diff --git a/.github/workflows/container-smoke-test.yml b/.github/workflows/container-smoke-test.yml index cced926bd..b875f51c3 100644 --- a/.github/workflows/container-smoke-test.yml +++ b/.github/workflows/container-smoke-test.yml @@ -37,3 +37,6 @@ jobs: - name: Smoke test REST proxy run: bin/smoke-test-rest.sh + + - name: Smoke test REST proxy standalone + run: bin/smoke-test-rest-proxy.sh diff --git a/bin/smoke-test-rest-proxy.sh b/bin/smoke-test-rest-proxy.sh new file mode 100755 index 000000000..57c919eae --- /dev/null +++ b/bin/smoke-test-rest-proxy.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +retries=5 + +for ((i = 0; i <= retries; i++)); do + response=$(curl --silent --verbose --fail http://localhost:8083/topics) + + if [[ $response == '["_schemas","__consumer_offsets"]' ]]; then + echo "Ok!" + break + fi + + if ((i == retries)); then + echo "Still failing after $i retries, giving up." + exit 1 + fi + + echo "Smoke test failed, retrying in 5 seconds ..." + sleep 5 +done diff --git a/container/compose.yml b/container/compose.yml index fa2c53265..f3789fe16 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -111,6 +111,34 @@ services: KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true + karapace-rest-proxy: + image: ghcr.io/aiven-open/karapace:develop + build: + context: .. + dockerfile: container/Dockerfile + entrypoint: + - /bin/bash + - /opt/karapace/start.sh + - rest_proxy + depends_on: + - kafka + - karapace-registry + ports: + - "8083:8083" + environment: + KARAPACE_PORT: 8083 + KARAPACE_HOST: 0.0.0.0 + KARAPACE_ADVERTISED_HOSTNAME: karapace-rest-proxy + KARAPACE_BOOTSTRAP_URI: kafka:29092 + KARAPACE_REGISTRY_HOST: karapace-registry + KARAPACE_REGISTRY_PORT: 8081 + KARAPACE_ADMIN_METADATA_MAX_AGE: 0 + KARAPACE_LOG_LEVEL: WARNING + KARAPACE_STATSD_HOST: statsd-exporter + KARAPACE_STATSD_PORT: 8125 + KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false + KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true + prometheus: image: prom/prometheus volumes: diff --git a/container/start.sh b/container/start.sh index 95ac86aa2..ba69b3f87 100755 --- a/container/start.sh +++ b/container/start.sh @@ -27,6 +27,22 @@ rest) echo "Starting Karapace REST API" exec python3 -m karapace.karapace_all /opt/karapace/rest.config.json ;; +rest_proxy) + # Reexport variables for compatibility + [[ -n ${KARAPACE_REST_ADVERTISED_HOSTNAME+isset} ]] && export KARAPACE_ADVERTISED_HOSTNAME="${KARAPACE_REST_ADVERTISED_HOSTNAME}" + [[ -n ${KARAPACE_REST_BOOTSTRAP_URI+isset} ]] && export KARAPACE_BOOTSTRAP_URI="${KARAPACE_REST_BOOTSTRAP_URI}" + [[ -n ${KARAPACE_REST_REGISTRY_HOST+isset} ]] && export KARAPACE_REGISTRY_HOST="${KARAPACE_REST_REGISTRY_HOST}" + [[ -n ${KARAPACE_REST_REGISTRY_PORT+isset} ]] && export KARAPACE_REGISTRY_PORT="${KARAPACE_REST_REGISTRY_PORT}" + [[ -n ${KARAPACE_REST_HOST+isset} ]] && export KARAPACE_HOST="${KARAPACE_REST_HOST}" + [[ -n ${KARAPACE_REST_PORT+isset} ]] && export KARAPACE_PORT="${KARAPACE_REST_PORT}" + [[ -n ${KARAPACE_REST_ADMIN_METADATA_MAX_AGE+isset} ]] && export KARAPACE_ADMIN_METADATA_MAX_AGE="${KARAPACE_REST_ADMIN_METADATA_MAX_AGE}" + [[ -n ${KARAPACE_REST_LOG_LEVEL+isset} ]] && export KARAPACE_LOG_LEVEL="${KARAPACE_REST_LOG_LEVEL}" + export KARAPACE_REST=1 + echo "{}" >/opt/karapace/rest.config.json + + echo "Starting Karapace REST API" + exec python3 -m rest_proxy /opt/karapace/rest.config.json + ;; registry) # Reexport variables for compatibility [[ -n ${KARAPACE_REGISTRY_ADVERTISED_HOSTNAME+isset} ]] && export KARAPACE_ADVERTISED_HOSTNAME="${KARAPACE_REGISTRY_ADVERTISED_HOSTNAME}" diff --git a/mypy.ini b/mypy.ini index c4ef8efd1..26963e571 100644 --- a/mypy.ini +++ b/mypy.ini @@ -65,10 +65,10 @@ ignore_errors = True [mypy-karapace.serialization] ignore_errors = True -[mypy-karapace.kafka_rest_apis.consumer_manager] +[mypy-rest_proxy.consumer_manager] ignore_errors = True -[mypy-karapace.kafka_rest_apis] +[mypy-rest_proxy] ignore_errors = True # Third-party libraries with no stubs available. Before adding libraries here, diff --git a/src/karapace/karapace_all.py b/src/karapace/karapace_all.py index ccdb96915..049d07c9c 100644 --- a/src/karapace/karapace_all.py +++ b/src/karapace/karapace_all.py @@ -9,10 +9,10 @@ from karapace import version as karapace_version from karapace.config import Config, read_config from karapace.instrumentation.prometheus import PrometheusInstrumentation -from karapace.kafka_rest_apis import KafkaRest from karapace.rapu import RestApp from karapace.schema_registry_apis import KarapaceSchemaRegistryController from karapace.utils import DebugAccessLogger +from rest_proxy import KafkaRest import argparse import logging diff --git a/src/karapace/kafka_rest_apis/__init__.py b/src/rest_proxy/__init__.py similarity index 99% rename from src/karapace/kafka_rest_apis/__init__.py rename to src/rest_proxy/__init__.py index 10675fb23..acf23cd55 100644 --- a/src/karapace/kafka_rest_apis/__init__.py +++ b/src/rest_proxy/__init__.py @@ -23,14 +23,6 @@ from karapace.errors import InvalidSchema from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import AsyncKafkaProducer -from karapace.kafka_rest_apis.authentication import ( - get_auth_config_from_header, - get_expiration_time_from_header, - get_kafka_client_auth_parameters_from_config, -) -from karapace.kafka_rest_apis.consumer_manager import ConsumerManager -from karapace.kafka_rest_apis.error_codes import RESTErrorCodes -from karapace.kafka_rest_apis.schema_cache import TopicSchemaCache from karapace.karapace import KarapaceBase from karapace.rapu import HTTPRequest, JSON_CONTENT_TYPE from karapace.schema_models import TypedSchema, ValidatedTypedSchema @@ -44,6 +36,14 @@ ) from karapace.typing import NameStrategy, SchemaId, Subject, SubjectType from karapace.utils import convert_to_int, json_encode +from rest_proxy.authentication import ( + get_auth_config_from_header, + get_expiration_time_from_header, + get_kafka_client_auth_parameters_from_config, +) +from rest_proxy.consumer_manager import ConsumerManager +from rest_proxy.error_codes import RESTErrorCodes +from rest_proxy.schema_cache import TopicSchemaCache from typing import Callable, TypedDict import asyncio diff --git a/src/rest_proxy/__main__.py b/src/rest_proxy/__main__.py new file mode 100644 index 000000000..2b0775131 --- /dev/null +++ b/src/rest_proxy/__main__.py @@ -0,0 +1,93 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from aiohttp.web_log import AccessLogger +from contextlib import closing +from karapace import version as karapace_version +from karapace.config import Config, read_config +from karapace.instrumentation.prometheus import PrometheusInstrumentation +from karapace.utils import DebugAccessLogger +from rest_proxy import KafkaRest +from typing import Final + +import argparse +import logging +import sys + +PROGRAM_NAME: Final[str] = "karapace_rest_proxy" + + +def _configure_logging(*, config: Config) -> None: + log_level = config.get("log_level", "DEBUG") + log_format = config.get("log_format", "%(name)-20s\t%(threadName)s\t%(levelname)-8s\t%(message)s") + + root_handler: logging.Handler | None = None + log_handler = config.get("log_handler", None) + if "systemd" == log_handler: + from systemd import journal + + root_handler = journal.JournalHandler(SYSLOG_IDENTIFIER="karapace") + elif "stdout" == log_handler or log_handler is None: + root_handler = logging.StreamHandler(stream=sys.stdout) + else: + logging.basicConfig(level=logging.INFO, format=log_format) + logging.getLogger().setLevel(log_level) + logging.warning("Log handler %s not recognized, root handler not set.", log_handler) + + if root_handler is not None: + root_handler.setFormatter(logging.Formatter(log_format)) + root_handler.setLevel(log_level) + root_handler.set_name(name="karapace") + logging.root.addHandler(root_handler) + + logging.root.setLevel(log_level) + + if config.get("access_logs_debug") is True: + config["access_log_class"] = DebugAccessLogger + logging.getLogger("aiohttp.access").setLevel(logging.DEBUG) + else: + config["access_log_class"] = AccessLogger + + +def main() -> int: + parser = argparse.ArgumentParser( + prog=PROGRAM_NAME, + description="Karapace rest proxy: exposes an API over common Kafka operations, your Kafka essentials in one tool", + ) + parser.add_argument("--version", action="version", help="show program version", version=karapace_version.__version__) + parser.add_argument("config_file", help="configuration file path", type=argparse.FileType()) + arg = parser.parse_args() + + with closing(arg.config_file): + config = read_config(arg.config_file) + + logging.log(logging.INFO, "\n%s\\Co %s\n%s", ("=" * 50), PROGRAM_NAME, ("=" * 50)) + + _configure_logging(config=config) + + app = KafkaRest(config=config) + + logging.log(logging.INFO, "\n%s\nStarting %s\n%s", ("=" * 50), PROGRAM_NAME, ("=" * 50)) + + config_without_secrets = {} + for key, value in config.items(): + if "password" in key: + value = "****" + config_without_secrets[key] = value + logging.log(logging.DEBUG, "Config %r", config_without_secrets) + + try: + PrometheusInstrumentation.setup_metrics(app=app) + app.run() # `close` will be called by the callback `close_by_app` set by `KarapaceBase` + except Exception as ex: # pylint: disable-broad-except + app.stats.unexpected_exception(ex=ex, where=f"{PROGRAM_NAME}_main") + raise + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/karapace/kafka_rest_apis/authentication.py b/src/rest_proxy/authentication.py similarity index 100% rename from src/karapace/kafka_rest_apis/authentication.py rename to src/rest_proxy/authentication.py diff --git a/src/karapace/kafka_rest_apis/consumer_manager.py b/src/rest_proxy/consumer_manager.py similarity index 99% rename from src/karapace/kafka_rest_apis/consumer_manager.py rename to src/rest_proxy/consumer_manager.py index 809478f4c..6c5b116a3 100644 --- a/src/karapace/kafka_rest_apis/consumer_manager.py +++ b/src/rest_proxy/consumer_manager.py @@ -19,11 +19,11 @@ from karapace.kafka.common import translate_from_kafkaerror from karapace.kafka.consumer import AsyncKafkaConsumer from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS, Timestamp -from karapace.kafka_rest_apis.authentication import get_kafka_client_auth_parameters_from_config -from karapace.kafka_rest_apis.error_codes import RESTErrorCodes from karapace.karapace import empty_response, KarapaceBase from karapace.serialization import DeserializationError, InvalidMessageHeader, InvalidPayload, SchemaRegistrySerializer from karapace.utils import convert_to_int, json_decode, JSONDecodeError +from rest_proxy.authentication import get_kafka_client_auth_parameters_from_config +from rest_proxy.error_codes import RESTErrorCodes from struct import error as UnpackError from urllib.parse import urljoin diff --git a/src/karapace/kafka_rest_apis/error_codes.py b/src/rest_proxy/error_codes.py similarity index 100% rename from src/karapace/kafka_rest_apis/error_codes.py rename to src/rest_proxy/error_codes.py diff --git a/src/karapace/kafka_rest_apis/schema_cache.py b/src/rest_proxy/schema_cache.py similarity index 100% rename from src/karapace/kafka_rest_apis/schema_cache.py rename to src/rest_proxy/schema_cache.py diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1673445ba..3e68b5968 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -19,8 +19,8 @@ from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.consumer import AsyncKafkaConsumer, KafkaConsumer from karapace.kafka.producer import AsyncKafkaProducer, KafkaProducer -from karapace.kafka_rest_apis import KafkaRest from pathlib import Path +from rest_proxy import KafkaRest from tests.conftest import KAFKA_VERSION from tests.integration.utils.cluster import RegistryDescription, RegistryEndpoint, start_schema_registry_cluster from tests.integration.utils.config import KafkaConfig, KafkaDescription, ZKConfig diff --git a/tests/integration/test_rest.py b/tests/integration/test_rest.py index ee504366b..6d406d04b 100644 --- a/tests/integration/test_rest.py +++ b/tests/integration/test_rest.py @@ -9,9 +9,9 @@ from karapace.client import Client from karapace.kafka.admin import KafkaAdminClient from karapace.kafka.producer import KafkaProducer -from karapace.kafka_rest_apis import KafkaRest, SUBJECT_VALID_POSTFIX from karapace.schema_type import SchemaType from karapace.version import __version__ +from rest_proxy import KafkaRest, SUBJECT_VALID_POSTFIX from tests.integration.conftest import REST_PRODUCER_MAX_REQUEST_BYTES from tests.utils import ( new_random_name, diff --git a/tests/integration/test_rest_consumer.py b/tests/integration/test_rest_consumer.py index f0003dbdd..357601fab 100644 --- a/tests/integration/test_rest_consumer.py +++ b/tests/integration/test_rest_consumer.py @@ -2,7 +2,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from karapace.kafka_rest_apis.consumer_manager import KNOWN_FORMATS +from rest_proxy.consumer_manager import KNOWN_FORMATS from tests.utils import ( consumer_valid_payload, new_consumer, diff --git a/tests/unit/kafka_rest_apis/__init__.py b/tests/unit/rest_proxy/__init__.py similarity index 100% rename from tests/unit/kafka_rest_apis/__init__.py rename to tests/unit/rest_proxy/__init__.py diff --git a/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py b/tests/unit/rest_proxy/test_rest_proxy_cluster_metadata_cache.py similarity index 99% rename from tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py rename to tests/unit/rest_proxy/test_rest_proxy_cluster_metadata_cache.py index b47fb5e02..bc40c783a 100644 --- a/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py +++ b/tests/unit/rest_proxy/test_rest_proxy_cluster_metadata_cache.py @@ -4,8 +4,8 @@ See LICENSE for details """ from karapace.config import DEFAULTS -from karapace.kafka_rest_apis import UserRestProxy from karapace.serialization import SchemaRegistrySerializer +from rest_proxy import UserRestProxy from unittest.mock import patch import copy diff --git a/tests/unit/test_authentication.py b/tests/unit/test_authentication.py index 40abc5c01..e07b00384 100644 --- a/tests/unit/test_authentication.py +++ b/tests/unit/test_authentication.py @@ -6,13 +6,13 @@ from http import HTTPStatus from karapace.config import ConfigDefaults, set_config_defaults -from karapace.kafka_rest_apis.authentication import ( +from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE +from rest_proxy.authentication import ( get_auth_config_from_header, get_expiration_time_from_header, get_kafka_client_auth_parameters_from_config, SimpleOauthTokenProvider, ) -from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE import base64 import datetime diff --git a/tests/unit/test_rest_auth.py b/tests/unit/test_rest_auth.py index 86bb14b8a..aee1b4b5e 100644 --- a/tests/unit/test_rest_auth.py +++ b/tests/unit/test_rest_auth.py @@ -6,7 +6,7 @@ from __future__ import annotations from karapace.config import set_config_defaults -from karapace.kafka_rest_apis import AUTH_EXPIRY_TOLERANCE, KafkaRest, UserRestProxy +from rest_proxy import AUTH_EXPIRY_TOLERANCE, KafkaRest, UserRestProxy from unittest.mock import call, Mock import asyncio