Skip to content

Commit

Permalink
Merge pull request #540 from aiven/fleshgrinder/backup-topic-creation…
Browse files Browse the repository at this point in the history
…-retries

Fix Backup Topic Creation Retries and Interrupts
  • Loading branch information
jjaakola-aiven authored Feb 15, 2023
2 parents 23b8893 + 676b3d8 commit 4fd802d
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 86 deletions.
204 changes: 121 additions & 83 deletions karapace/schema_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@
from enum import Enum
from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient
from kafka.errors import KafkaError, NoBrokersAvailable, NodeNotReadyError, TopicAlreadyExistsError
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.structs import PartitionMetadata
from karapace import constants
from karapace.anonymize_schemas import anonymize_avro
from karapace.config import Config, read_config
from karapace.key_format import KeyFormatter
from karapace.schema_reader import new_schema_topic_from_config
from karapace.typing import JsonData
from karapace.utils import json_decode, json_encode, KarapaceKafkaClient, Timeout
from karapace.utils import json_decode, json_encode, KarapaceKafkaClient
from pathlib import Path
from tempfile import mkstemp
from tenacity import retry, RetryCallState, stop_after_delay, wait_fixed
from typing import AbstractSet, Callable, IO, Optional, TextIO, Tuple, Union

import argparse
Expand All @@ -26,7 +27,6 @@
import logging
import os
import sys
import time

LOG = logging.getLogger(__name__)

Expand All @@ -49,6 +49,27 @@ class PartitionCountError(BackupError):
pass


def __before_sleep(description: str) -> Callable[[RetryCallState], None]:
"""Returns a function to print a user-friendly message before going to sleep in retries.
:param description: of the action, should compose well with _failed_ and _returned_ as next words.
:returns: a function that can be used in ``tenacity.retry``'s ``before_sleep`` argument for printing a user-friendly
message that explains which action failed, that a retry is going to happen, and how to abort if desired.
"""

def before_sleep(it: RetryCallState) -> None:
outcome = it.outcome
if outcome is None:
result = "did not complete yet"
elif outcome.failed:
result = f"failed ({outcome.exception()})"
else:
result = f"returned {outcome.result()!r}"
print(f"{description} {result}, retrying... (Ctrl+C to abort)", file=sys.stderr)

return before_sleep


def __check_partition_count(topic: str, supplier: Callable[[str], AbstractSet[PartitionMetadata]]) -> None:
"""Checks that the given topic has exactly one partition.
Expand All @@ -64,6 +85,80 @@ def __check_partition_count(topic: str, supplier: Callable[[str], AbstractSet[Pa
)


@contextlib.contextmanager
def _admin(config: Config) -> KafkaAdminClient:
"""Creates an automatically closing Kafka admin client.
:param config: for the client.
:raises Exception: if client creation fails, concrete exception types are unknown, see Kafka implementation.
"""

@retry(
before_sleep=__before_sleep("Kafka Admin client creation"),
reraise=True,
stop=stop_after_delay(60), # seconds
wait=wait_fixed(1), # seconds
)
def __admin() -> KafkaAdminClient:
return KafkaAdminClient(
api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS,
bootstrap_servers=config["bootstrap_uri"],
client_id=config["client_id"],
security_protocol=config["security_protocol"],
ssl_cafile=config["ssl_cafile"],
ssl_certfile=config["ssl_certfile"],
ssl_keyfile=config["ssl_keyfile"],
kafka_client=KarapaceKafkaClient,
)

admin = __admin()
try:
yield admin
finally:
admin.close()


@retry(
before_sleep=__before_sleep("Schemas topic creation"),
reraise=True,
stop=stop_after_delay(60), # seconds
wait=wait_fixed(1), # seconds
)
def _maybe_create_topic(config: Config, name: Optional[str] = None) -> Optional[bool]:
"""Creates the topic if the given name and the one in the config are the same.
:param config: for the admin client.
:param name: of the topic to create.
:returns: ``True`` if the topic was created, ``False`` if it already exists, and ``None`` if the given name does not
match the name of the schema topic in the config, in which case nothing has been done.
:raises Exception: if topic creation fails, concrete exception types are unknown, see Kafka implementation.
"""
topic = new_schema_topic_from_config(config)

if name is not None and topic.name != name:
LOG.warning(
"Not creating topic, because the name %r from the config and the name %r from the CLI differ.",
topic.name,
name,
)
return None

with _admin(config) as admin:
try:
admin.create_topics([topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS)
LOG.info(
"Created topic %r (partition count: %s, replication factor: %s, config: %s)",
topic.name,
topic.num_partitions,
topic.replication_factor,
topic.topic_configs,
)
return True
except TopicAlreadyExistsError:
LOG.debug("Topic %r already exists", topic.name)
return False


@contextlib.contextmanager
def _consumer(config: Config, topic: str) -> KafkaConsumer:
"""Creates an automatically closing Kafka consumer client.
Expand Down Expand Up @@ -147,7 +242,6 @@ def _writer(file: Union[str, Path], *, overwrite: Optional[bool] = None) -> Text
dst = file.absolute()

def check_dst() -> None:
nonlocal overwrite, dst
if dst.exists():
if overwrite is not True:
raise FileExistsError(f"--location already exists at {dst}, use --overwrite to replace the file.")
Expand Down Expand Up @@ -195,7 +289,6 @@ def __init__(self, config: Config, backup_path: str, topic_option: Optional[str]
self.config = config
self.backup_location = backup_path
self.topic_name = topic_option or self.config["topic_name"]
self.admin_client = None
self.timeout_ms = 1000
self.timeout_kafka_producer = 5

Expand All @@ -204,70 +297,11 @@ def __init__(self, config: Config, backup_path: str, topic_option: Optional[str]
if self.topic_name == constants.DEFAULT_SCHEMA_TOPIC or self.config.get("force_key_correction", False):
self.key_formatter = KeyFormatter()

def init_admin_client(self) -> None:
start_time = time.monotonic()
wait_time = constants.MINUTE
while True:
if time.monotonic() - start_time > wait_time:
raise Timeout(f"Timeout ({wait_time}) on creating admin client")

try:
self.admin_client = KafkaAdminClient(
api_version_auto_timeout_ms=constants.API_VERSION_AUTO_TIMEOUT_MS,
bootstrap_servers=self.config["bootstrap_uri"],
client_id=self.config["client_id"],
security_protocol=self.config["security_protocol"],
ssl_cafile=self.config["ssl_cafile"],
ssl_certfile=self.config["ssl_certfile"],
ssl_keyfile=self.config["ssl_keyfile"],
kafka_client=KarapaceKafkaClient,
)
break
except (NodeNotReadyError, NoBrokersAvailable, AssertionError):
LOG.warning("No Brokers available yet, retrying init_admin_client()")
except: # pylint: disable=bare-except
LOG.exception("Failed to initialize admin client, retrying init_admin_client()")

time.sleep(2.0)

def _create_schema_topic_if_needed(self) -> None:
if self.topic_name != self.config["topic_name"]:
LOG.info("Topic name overridden, not creating a topic with schema configuration")
return

self.init_admin_client()
start_time = time.monotonic()
wait_time = constants.MINUTE
while True:
if time.monotonic() - start_time > wait_time:
raise Timeout(f"Timeout ({wait_time}) on creating admin client")

schema_topic = new_schema_topic_from_config(self.config)
try:
LOG.info("Creating schema topic: %r", schema_topic)
self.admin_client.create_topics([schema_topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS)
LOG.info("Topic: %r created successfully", self.config["topic_name"])
break
except TopicAlreadyExistsError:
LOG.info("Topic: %r already exists", self.config["topic_name"])
break
except: # pylint: disable=bare-except
LOG.exception(
"Failed to create topic: %r, retrying _create_schema_topic_if_needed()", self.config["topic_name"]
)
time.sleep(5)

def close(self) -> None:
LOG.info("Closing schema backup reader")
if self.admin_client:
self.admin_client.close()
self.admin_client = None

def restore_backup(self) -> None:
if not os.path.exists(self.backup_location):
raise BackupError("Backup location doesn't exist")

self._create_schema_topic_if_needed()
_maybe_create_topic(self.config, self.topic_name)

with _producer(self.config, self.topic_name) as producer:
LOG.info("Starting backup restore for topic: %r", self.topic_name)
Expand All @@ -277,7 +311,6 @@ def restore_backup(self) -> None:
self._restore_backup_version_2(producer, fp)
else:
self._restore_backup_version_1_single_array(producer, fp)
self.close()

def _handle_restore_message(self, producer: KafkaProducer, item: Tuple[str, str]) -> None:
key = self.encode_key(item[0])
Expand Down Expand Up @@ -329,7 +362,6 @@ def export(self, export_func, *, overwrite: Optional[bool] = None) -> None:
fp.write(ser)

LOG.info("Schema export written to %r", "stdout" if fp is sys.stdout else self.backup_location)
self.close()

def encode_key(self, key: Optional[Union[JsonData, str]]) -> Optional[bytes]:
if key == "null":
Expand Down Expand Up @@ -396,25 +428,31 @@ def parse_args():
return parser.parse_args()


def main() -> int:
args = parse_args()
def main() -> None:
try:
args = parse_args()

with open(args.config, encoding="utf8") as handler:
config = read_config(handler)
with open(args.config, encoding="utf8") as handler:
config = read_config(handler)

sb = SchemaBackup(config, args.location, args.topic)
sb = SchemaBackup(config, args.location, args.topic)

if args.command == "get":
sb.export(serialize_record, overwrite=args.overwrite)
return 0
if args.command == "restore":
sb.restore_backup()
return 0
if args.command == "export-anonymized-avro-schemas":
sb.export(anonymize_avro_schema_message, overwrite=args.overwrite)
return 0
return 1
if args.command == "get":
sb.export(serialize_record, overwrite=args.overwrite)
elif args.command == "restore":
sb.restore_backup()
elif args.command == "export-anonymized-avro-schemas":
sb.export(anonymize_avro_schema_message, overwrite=args.overwrite)
else:
# Only reachable if a new subcommand was added that is not mapped above. There are other ways with argparse
# to handle this, but all rely on the programmer doing exactly the right thing. Only switching to another
# CLI framework would provide the ability to not handle this situation manually while ensuring that it is
# not possible to add a new subcommand without also providing a handler for it.
raise SystemExit(f"Entered unreachable code, unknown command: {args.command!r}")
except KeyboardInterrupt as e:
# Not an error -- user choice -- and thus should not end up in a Python stacktrace.
raise SystemExit(2) from e


if __name__ == "__main__":
sys.exit(main())
main()
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ jsonschema==3.2.0
networkx==2.5
protobuf==3.19.5
python-dateutil==2.8.2
tenacity==8.0.1
ujson==5.7.0

# Using 0.15.0 until following issue gets fixed.
Expand Down
70 changes: 67 additions & 3 deletions tests/unit/test_schema_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,83 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from kafka import KafkaConsumer, KafkaProducer
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from kafka.structs import PartitionMetadata
from karapace import config
from karapace.config import Config
from karapace.schema_backup import _consumer, _producer, _writer, PartitionCountError
from karapace.constants import DEFAULT_SCHEMA_TOPIC
from karapace.schema_backup import _admin, _consumer, _maybe_create_topic, _producer, _writer, PartitionCountError
from pathlib import Path
from types import FunctionType
from typing import AbstractSet, Callable, Type, Union
from typing import AbstractSet, Callable, List, Type, Union
from unittest import mock
from unittest.mock import MagicMock

import pytest
import sys

ADMIN_NEW_FQN = f"{KafkaAdminClient.__module__}.{KafkaAdminClient.__qualname__}.__new__"


class TestAdmin:
@mock.patch(ADMIN_NEW_FQN, autospec=True)
def test_auto_closing(self, admin_new: MagicMock):
admin_mock = admin_new.return_value
with _admin(config.DEFAULTS) as admin:
assert admin is admin_mock
assert admin_mock.close.call_count == 1

@mock.patch("time.sleep", autospec=True)
@mock.patch(ADMIN_NEW_FQN, autospec=True)
def test_retry(self, admin_new: MagicMock, sleep_mock: MagicMock) -> None:
admin_mock = admin_new.return_value
admin_new.side_effect = [Exception("1"), Exception("2"), admin_mock]
with _admin(config.DEFAULTS) as admin:
assert admin is admin_mock
assert sleep_mock.call_count == 2 # proof that we waited between retries
assert admin_mock.close.call_count == 1

@pytest.mark.parametrize("e", (KeyboardInterrupt, SystemExit))
@mock.patch("time.sleep", autospec=True)
@mock.patch(ADMIN_NEW_FQN, autospec=True)
def test_interrupts(self, admin_new: MagicMock, sleep_mock: MagicMock, e: Type[BaseException]) -> None:
admin_new.side_effect = [e()]
with pytest.raises(e):
with _admin(config.DEFAULTS):
pass
assert sleep_mock.call_count == 0 # proof that we did not retry


class TestMaybeCreateTopic:
@mock.patch(ADMIN_NEW_FQN, autospec=True)
def test_ok(self, admin_new: MagicMock) -> None:
assert _maybe_create_topic(config.DEFAULTS) is True
create_topics: MagicMock = admin_new.return_value.create_topics
assert create_topics.call_count == 1
topic_list: List[NewTopic] = create_topics.call_args[0][0]
assert len(topic_list) == 1
assert topic_list[0].name == DEFAULT_SCHEMA_TOPIC

@mock.patch(ADMIN_NEW_FQN, autospec=True)
def test_exists(self, admin_new: MagicMock) -> None:
create_topics: MagicMock = admin_new.return_value.create_topics
create_topics.side_effect = TopicAlreadyExistsError()
assert _maybe_create_topic(config.DEFAULTS) is False

@mock.patch("time.sleep", autospec=True)
@mock.patch(ADMIN_NEW_FQN, autospec=True)
def test_retry(self, admin_new: MagicMock, sleep_mock: MagicMock) -> None:
create_topics: MagicMock = admin_new.return_value.create_topics
create_topics.side_effect = [Exception("1"), Exception("2"), None]
assert _maybe_create_topic(config.DEFAULTS) is True
assert sleep_mock.call_count == 2 # proof that we waited between retries

def test_name_override(self) -> None:
assert "custom-name" != DEFAULT_SCHEMA_TOPIC
assert _maybe_create_topic(config.DEFAULTS, "custom-name") is None


class TestClients:
@staticmethod
Expand Down

0 comments on commit 4fd802d

Please sign in to comment.