From d4ddc9293e44d5b399acff5ed4cbbca6b561fe74 Mon Sep 17 00:00:00 2001 From: brunneis Date: Tue, 10 Jan 2023 00:56:59 +0000 Subject: [PATCH] Python 3.11, basic auth, update styles, change logging --- docker/Dockerfile | 4 +- docker/config.yaml | 3 + src/.style.yapf | 4 ++ src/stopover_server/__main__.py | 48 +++++++-------- src/stopover_server/broker.py | 100 ++++++++++++++++++++++--------- src/stopover_server/partition.py | 36 ++++++----- src/stopover_server/utils.py | 25 ++++++++ src/stopover_server/version.py | 2 +- 8 files changed, 151 insertions(+), 71 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 4f76117..464f041 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.9-slim-buster as BUILD_IMAGE +FROM python:3.11-slim-buster as BUILD_IMAGE ARG ROCKSDB_VERSION=6.10.2 ARG ROCKSDB_BASE_URL=https://github.com/facebook/rocksdb/archive @@ -33,7 +33,7 @@ RUN \ apt-get -y install libev-dev && \ python setup.py install -FROM python:3.9-slim-buster +FROM python:3.11-slim-buster COPY --from=BUILD_IMAGE /usr/lib /usr/lib COPY --from=BUILD_IMAGE /usr/local/lib /usr/local/lib COPY --from=BUILD_IMAGE /usr/include /usr/include diff --git a/docker/config.yaml b/docker/config.yaml index 219ebb5..ce205e1 100644 --- a/docker/config.yaml +++ b/docker/config.yaml @@ -1,4 +1,7 @@ --- +auth: + guest: guest + global: port: 5704 data_dir: ./data diff --git a/src/.style.yapf b/src/.style.yapf index 94f96a1..74fcbf6 100644 --- a/src/.style.yapf +++ b/src/.style.yapf @@ -3,3 +3,7 @@ based_on_style=pep8 column_limit=79 split_before_arithmetic_operator=true split_before_logical_operator=true +split_before_named_assigns=true +split_before_first_argument=true +allow_split_before_dict_value=false +dedent_closing_brackets=true diff --git a/src/stopover_server/__main__.py b/src/stopover_server/__main__.py index 7b0d9f5..8e2fe24 100755 --- a/src/stopover_server/__main__.py +++ b/src/stopover_server/__main__.py @@ -10,35 +10,33 @@ import bjoern banner = f""" - ███████████ ███████████ - █████████████ ███████████████ -███████████████ █████████████████ - █████████████ ███████████████████ - ███████████ ███ █████████████████ - ███ ███████████████ - ███ ███████████ - ███ ███ - ███ ███ - ███ ███ - █████████████████ ███ - ███████████████████████ ███ - █████████████████████████ ███ - █████████████████████████████ - █████████████████████████████ - █████████████████████████████ - █████████████████████████████ - ███████████████████████████ - ███████████████████████ - ███████████████████ - ███████████ + █████████ + █████████ █████████████ + █████████████ █████████████████ + ███████████████ ███████████████████ + █████████████ █████████████████ + █████████ ███ █████████████ + ███ █████████ + ███ ███ + █████████████ ███ + █████████████████ ███ + █████████████████████ ███ + █████████████████████████ + ███████████████████████████ + █████████████████████████ + █████████████████████ + █████████████████ + █████████████ Stopover v{__version__} """ CONFIG_PATH = './config.yaml' logging.getLogger().setLevel(logging.INFO) -logging.basicConfig(format='%(asctime)-15s [%(levelname)s] %(message)s', - datefmt='%Y-%m-%d %H:%M:%S') +logging.basicConfig( + format='%(asctime)-15s [%(levelname)s] %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) def main(): @@ -48,8 +46,8 @@ def main(): config = yaml.safe_load(input_file) try: - open(f"{config['global']['data_dir']}/streams/.active") - + file = open(f"{config['global']['data_dir']}/streams/.active") + file.close() except FileNotFoundError: logging.critical('the streams dir is not active') sys.exit(1) diff --git a/src/stopover_server/broker.py b/src/stopover_server/broker.py index e26b674..d0e43a0 100755 --- a/src/stopover_server/broker.py +++ b/src/stopover_server/broker.py @@ -12,6 +12,7 @@ import time import json import logging +import base64 class STATUS: @@ -22,6 +23,7 @@ class STATUS: def handle_error(method): + def _try_except(self, *args, **kwargs): try: return method(self, *args, **kwargs) @@ -39,9 +41,10 @@ def _try_except(self, *args, **kwargs): class Broker: + def __init__(self, config): self.config = config - logging.info(f'config: {json.dumps(config, indent=2)}') + utils.log_dict(self.config, prefix='⚙️ ') self.partitions_by_stream_lock = Lock() self.partitions_by_stream = {} @@ -58,12 +61,44 @@ def __init__(self, config): Thread(target=self._rebalance_loop, daemon=True).start() Thread(target=self._prune_loop, daemon=True).start() + def check_authenticated(self, headers): + if 'authorization' in headers: + return True + return False + + def check_authorized(self, headers): + token = headers['authorization'].split('Basic ')[1] + client_id, client_secret = base64.b64decode(token) \ + .decode('ascii').split(':') + return ( + client_id in self.config['auth'] + and client_secret == self.config['auth'][client_id] + ) + @staticmethod def on_get(request, response): response.content_type = 'text/html; charset=utf-8' response.body = f'Labteral Stopover {__version__}' def on_post(self, request, response): + headers = { + key.lower(): value + for (key, value) in request.headers.items() + } + + if 'auth' in self.config: + is_authenticated = self.check_authenticated(headers) + if not is_authenticated: + response.status = falcon.status_codes.HTTP_401 + return + + is_authorized = self.check_authorized( + headers + ) if is_authenticated else False + if not is_authorized: + response.status = falcon.status_codes.HTTP_403 + return + bin_data = request.stream.read() plain_response = False @@ -290,7 +325,8 @@ def _get_partition(self, stream: str, partition_number: int): self.partitions[stream][partition_number] = Partition( stream=stream, number=partition_number, - data_dir=self.config['global']['data_dir']) + data_dir=self.config['global']['data_dir'] + ) return self.partitions[stream][partition_number] def _get_receiver_partition_numbers( @@ -306,12 +342,13 @@ def _get_receiver_partition_numbers( if receiver_group not in self.partitions_by_group[stream]: self.partitions_by_group[stream][receiver_group] = {} - if receiver not in self.partitions_by_group[stream][ - receiver_group]: + if receiver not in self.partitions_by_group[stream][receiver_group + ]: self.partitions_by_group[stream][receiver_group][receiver] = [] return list( - self.partitions_by_group[stream][receiver_group][receiver]) + self.partitions_by_group[stream][receiver_group][receiver] + ) def _get_stream_path(self, stream: str) -> str: return f"{self.config['global']['data_dir']}/streams/{stream}/" @@ -325,8 +362,8 @@ def _get_stream_partition_numbers(self, stream: str): self.partitions_by_stream[stream] = partition_numbers try: - partitions_target = self.config['streams'][stream][ - 'partitions'] + partitions_target = self.config['streams'][stream]['partitions' + ] except KeyError: partitions_target = self.config['global']['partitions'] @@ -344,12 +381,15 @@ def _get_stream_partition_numbers(self, stream: str): partitions_target): if partition_number in partition_numbers: raise FileNotFoundError( - f'missing partitions among {partition_numbers}') - - Partition(stream=stream, - number=partition_number, - data_dir=self.config['global']['data_dir'], - create_if_missing=True) + f'missing partitions among {partition_numbers}' + ) + + Partition( + stream=stream, + number=partition_number, + data_dir=self.config['global']['data_dir'], + create_if_missing=True + ) partition_numbers.append(partition_number) return self.partitions_by_stream[stream] @@ -359,16 +399,15 @@ def _rebalance_loop(self): self._rebalance() remaining_seconds = self.config['global']['rebalance_interval'] logging.debug( - f"next rebalance will hapen in {remaining_seconds} seconds") + f"next rebalance will hapen in {remaining_seconds} seconds" + ) time.sleep(self.config['global']['rebalance_interval']) def _rebalance(self): with self.partitions_by_group_lock: logging.debug('rebalancing...') if self.partitions_by_group: - logging.info( - 'assignments: ' - f'{json.dumps(self.partitions_by_group, indent=4)}') + utils.log_dict(self.partitions_by_group) receivers_to_remove = [] for stream in self.partitions_by_group: @@ -389,7 +428,8 @@ def _rebalance(self): else: receivers_to_remove.append( - (stream, receiver_group, receiver)) + (stream, receiver_group, receiver) + ) stream_partition_numbers = \ self._get_stream_partition_numbers(stream) @@ -410,22 +450,22 @@ def _rebalance(self): step): receiver_index = index // step self.partitions_by_group[stream][receiver_group][ - stream_receiver_group_receivers[ - receiver_index]] = stream_partition_numbers[ - index:index + step] + stream_receiver_group_receivers[receiver_index] + ] = stream_partition_numbers[index:index + step] for index in range(number_of_partitions - remainder, number_of_partitions): receiver_index = index - number_of_partitions + 1 self.partitions_by_group[stream][receiver_group][ - stream_receiver_group_receivers[ - receiver_index]].append( - stream_partition_numbers[index]) + stream_receiver_group_receivers[receiver_index] + ].append(stream_partition_numbers[index]) for stream, receiver_group, receiver in receivers_to_remove: - logging.info(f'receiver "{receiver}" kicked from the ' - f'receiver_group "{receiver_group}" ' - f'for the stream "{stream}"') + logging.info( + f'receiver "{receiver}" kicked from the ' + f'receiver_group "{receiver_group}" ' + f'for the stream "{stream}"' + ) del self.partitions_by_group[stream][receiver_group][receiver] if receiver in self.last_seen_by_group[receiver_group]: del self.last_seen_by_group[receiver_group][receiver] @@ -473,8 +513,10 @@ def _prune_loop(self): for partition_number in partition_numbers: logging.info( - f'pruning stream {stream} ({partition_number})') + f'pruning stream {stream} ({partition_number})' + ) partition = self._get_partition( - stream, partition_number) + stream, partition_number + ) partition.prune(int(ttl)) diff --git a/src/stopover_server/partition.py b/src/stopover_server/partition.py index 1ec2791..a755d59 100755 --- a/src/stopover_server/partition.py +++ b/src/stopover_server/partition.py @@ -13,10 +13,13 @@ class PartitionItem: - def __init__(self, - value: bytes = None, - timestamp: int = None, - item_dict: Dict = None): + + def __init__( + self, + value: bytes = None, + timestamp: int = None, + item_dict: Dict = None + ): if item_dict is not None: self._load_from_dict(item_dict) else: @@ -47,11 +50,13 @@ class Partition: INDEX = b'\x01' OFFSET = b'\x02' - def __init__(self, - stream: str, - number: int, - data_dir: str, - create_if_missing: bool = False): + def __init__( + self, + stream: str, + number: int, + data_dir: str, + create_if_missing: bool = False + ): self.lock = Lock() self.stream = stream self.number = number @@ -103,7 +108,7 @@ def get(self, receiver_group: str, index=None) -> dict: partition_item = self._get_by_index(receiver_index) if partition_item is None: - return + return None partition_item_dict = partition_item.dict partition_item_dict['index'] = receiver_index @@ -113,8 +118,10 @@ def commit(self, offset: int, receiver: str): with self.lock: expected_offset = self._get_offset(receiver) + 1 if offset != expected_offset: - raise ValueError(f'trying to commit offset {offset} ' - f'but expecting {expected_offset}') + raise ValueError( + f'trying to commit offset {offset} ' + f'but expecting {expected_offset}' + ) self._increase_offset(receiver) def set_offset(self, receiver: str, offset: int): @@ -147,7 +154,7 @@ def prune(self, ttl: int): logging.debug(f'Deleting {key}') self._store.delete(key) - def _get_by_index(self, index: int) -> bytes: + def _get_by_index(self, index: int) -> PartitionItem: message_key = self._get_message_key(index) value = self._store.get(message_key) if value is None: @@ -196,5 +203,6 @@ def _get_offset_key(receiver: str) -> bytes: @staticmethod def _get_message_key(index: int) -> bytes: message_key = Partition.MESSAGE + int_to_padded_bytes( - index, UINT_BYTES) + index, UINT_BYTES + ) return message_key diff --git a/src/stopover_server/utils.py b/src/stopover_server/utils.py index 20b63e5..a34d3f6 100755 --- a/src/stopover_server/utils.py +++ b/src/stopover_server/utils.py @@ -6,6 +6,7 @@ import time import random import hashlib +import logging def pack(message: dict) -> bytes: @@ -55,3 +56,27 @@ def get_partition_number(partition_numbers, key=None): partition = partition_numbers[partition_index] return partition + + +def log_dict( + config, + prefix=None, + key_prefix=None, +): + if prefix is None: + prefix = '' + + if key_prefix is None: + key_prefix = '' + + for key, value in config.items(): + if isinstance(value, dict): + if key == 'auth': + value = dict(map(lambda k: (k, ''), value.keys())) + log_dict( + config=value, + prefix=prefix, + key_prefix=f'{key_prefix}{key}.', + ) + else: + logging.info(f'{prefix}{key_prefix}{key}={value}') diff --git a/src/stopover_server/version.py b/src/stopover_server/version.py index 7c04530..1f329d6 100755 --- a/src/stopover_server/version.py +++ b/src/stopover_server/version.py @@ -1,4 +1,4 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -__version__ = '2.2112.0' +__version__ = '2.2301.0'