diff --git a/CHANGELOG.md b/CHANGELOG.md index 993f430..fe8ba45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [2.2.2] - 2026-01-03 +### Added +- Detailed typing to all classes + ## [2.2.1] - 2023-09-03 ### Added - channel_name to creator diff --git a/README.md b/README.md index 6206b4f..6bd52fa 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Async-Channel [2.2.1](https://github.com/Drakkar-Software/Async-Channel/blob/master/CHANGELOG.md) +# Async-Channel [2.2.2](https://github.com/Drakkar-Software/Async-Channel/blob/master/CHANGELOG.md) [![Codacy Badge](https://app.codacy.com/project/badge/Grade/523d43c62f1d4de08395752367f5fddc)](https://www.codacy.com/gh/Drakkar-Software/Async-Channel/dashboard?utm_source=github.com&utm_medium=referral&utm_content=Drakkar-Software/Async-Channel&utm_campaign=Badge_Grade) [![PyPI](https://img.shields.io/pypi/v/async-channel.svg)](https://pypi.python.org/pypi/async-channel/) [![Github-Action-CI](https://github.com/Drakkar-Software/Async-Channel/workflows/Async-Channel-Default-CI/badge.svg)](https://github.com/Drakkar-Software/Async-Channel/actions) diff --git a/async_channel/__init__.py b/async_channel/__init__.py index bed7369..d03244e 100644 --- a/async_channel/__init__.py +++ b/async_channel/__init__.py @@ -37,7 +37,7 @@ ) PROJECT_NAME = "async-channel" -VERSION = "2.2.1" # major.minor.revision +VERSION = "2.2.2" # major.minor.revision __all__ = [ "CHANNEL_WILDCARD", diff --git a/async_channel/channels/channel.py b/async_channel/channels/channel.py index 85e9e43..f71f521 100644 --- a/async_channel/channels/channel.py +++ b/async_channel/channels/channel.py @@ -23,6 +23,9 @@ import async_channel.enums import async_channel.channels.channel_instances as channel_instances +if typing.TYPE_CHECKING: + import async_channel.producer + # pylint: disable=undefined-variable, not-callable class Channel: @@ -34,10 +37,14 @@ class Channel: """ # Channel producer class - PRODUCER_CLASS = None + PRODUCER_CLASS: typing.Optional[typing.Type["async_channel.producer.Producer"]] = ( + None + ) # Channel consumer class - CONSUMER_CLASS = None + CONSUMER_CLASS: typing.Optional[typing.Type["async_channel.consumer.Consumer"]] = ( + None + ) # Consumer instance in consumer filters INSTANCE_KEY = "consumer_instance" @@ -51,22 +58,26 @@ def __init__(self): self.logger = logging.get_logger(self.__class__.__name__) # Channel unique id - self.chan_id = None + self.chan_id: typing.Optional[str] = None # Channel subscribed producers list - self.producers = [] + self.producers: list["async_channel.producer.Producer"] = [] - # Channel subscribed consumers list - self.consumers = [] + # Channel subscribed consumers list: list dicts of dicts containing: + # - At least a consumer instance under the INSTANCE_KEY key + # - Possibly other filters under other keys and values + self.consumers: list[dict[str, typing.Any]] = [] # Used to perform global send from non-producer context - self.internal_producer = None + self.internal_producer: typing.Optional["async_channel.producer.Producer"] = ( + None + ) # Used to save producers state (paused or not) - self.is_paused = True + self.is_paused: bool = True # Used to synchronize producers and consumer - self.is_synchronized = False + self.is_synchronized: bool = False @classmethod def get_name(cls) -> str: @@ -80,11 +91,11 @@ def get_name(cls) -> str: async def new_consumer( self, callback: object = None, - consumer_filters: dict = None, - internal_consumer: object = None, + consumer_filters: typing.Optional[dict] = None, + internal_consumer: typing.Optional["async_channel.consumer.Consumer"] = None, size: int = 0, priority_level: int = DEFAULT_PRIORITY_LEVEL, - ) -> CONSUMER_CLASS: + ) -> "async_channel.consumer.Consumer": """ Create an appropriate consumer instance for this async_channel and add it to the consumer list Should end by calling '_check_producers_state' @@ -98,7 +109,7 @@ async def new_consumer( consumer = ( internal_consumer if internal_consumer - else self.CONSUMER_CLASS(callback, size=size, priority_level=priority_level) + else self.CONSUMER_CLASS(callback, size=size, priority_level=priority_level) # type: ignore ) await self._add_new_consumer_and_run(consumer, consumer_filters) await self._check_producers_state() @@ -106,7 +117,10 @@ async def new_consumer( # pylint: disable=unused-argument async def _add_new_consumer_and_run( - self, consumer: CONSUMER_CLASS, consumer_filters: dict, **kwargs + self, + consumer: "async_channel.consumer.Consumer", + consumer_filters: typing.Optional[dict], + **kwargs, ) -> None: """ Should be called by 'new_consumer' to add the consumer to self.consumers and call 'consumer.run()' @@ -120,7 +134,9 @@ async def _add_new_consumer_and_run( self.add_new_consumer(consumer, consumer_filters) await consumer.run(with_task=not self.is_synchronized) - def add_new_consumer(self, consumer, consumer_filters) -> None: + def add_new_consumer( + self, consumer: "async_channel.consumer.Consumer", consumer_filters: dict + ) -> None: """ Add a new consumer to consumer list with filters :param consumer: the consumer to add @@ -130,7 +146,9 @@ def add_new_consumer(self, consumer, consumer_filters) -> None: consumer_filters[self.INSTANCE_KEY] = consumer self.consumers.append(consumer_filters) - def get_consumer_from_filters(self, consumer_filters) -> list: + def get_consumer_from_filters( + self, consumer_filters: dict + ) -> list["async_channel.consumer.Consumer"]: """ Returns the instance filtered consumers list WARNING: @@ -141,7 +159,7 @@ def get_consumer_from_filters(self, consumer_filters) -> list: """ return self._filter_consumers(consumer_filters) - def get_consumers(self) -> list: + def get_consumers(self) -> list["async_channel.consumer.Consumer"]: """ Returns all consumers instance Can be overwritten according to the class needs @@ -149,7 +167,9 @@ def get_consumers(self) -> list: """ return [consumer[self.INSTANCE_KEY] for consumer in self.consumers] - def get_prioritized_consumers(self, priority_level) -> list: + def get_prioritized_consumers( + self, priority_level: int + ) -> list["async_channel.consumer.Consumer"]: """ Returns all consumers instance Can be overwritten according to the class needs @@ -161,7 +181,9 @@ def get_prioritized_consumers(self, priority_level) -> list: if consumer[self.INSTANCE_KEY].priority_level <= priority_level ] - def _filter_consumers(self, consumer_filters) -> list: + def _filter_consumers( + self, consumer_filters: dict + ) -> list["async_channel.consumer.Consumer"]: """ Returns the consumers that match the selection Returns all consumer instances if consumer_filter is empty @@ -174,7 +196,9 @@ def _filter_consumers(self, consumer_filters) -> list: if _check_filters(consumer, consumer_filters) ] - async def remove_consumer(self, consumer: CONSUMER_CLASS) -> None: + async def remove_consumer( + self, consumer: "async_channel.consumer.Consumer" + ) -> None: """ Should be overwritten according to the class needs Should end by calling '_check_producers_state' and then 'consumer.stop' @@ -234,7 +258,9 @@ def _should_resume_producers(self) -> bool: return True return False - async def register_producer(self, producer) -> None: + async def register_producer( + self, producer: "async_channel.producer.Producer" + ) -> None: """ Add the producer to producers list Can be overwritten to perform additional action when registering @@ -247,7 +273,7 @@ async def register_producer(self, producer) -> None: if self.is_paused: await producer.pause() - def unregister_producer(self, producer) -> None: + def unregister_producer(self, producer: "async_channel.producer.Producer") -> None: """ Remove the producer from producers list Can be overwritten to perform additional action when registering @@ -256,7 +282,7 @@ def unregister_producer(self, producer) -> None: if producer in self.producers: self.producers.remove(producer) - def get_producers(self) -> typing.Iterable: + def get_producers(self) -> typing.Iterable["async_channel.producer.Producer"]: """ Should be overwritten according to the class needs :return: async_channel producers iterable @@ -306,7 +332,7 @@ async def modify(self, **kwargs) -> None: for producer in self.get_producers(): await producer.modify(**kwargs) - def get_internal_producer(self, **kwargs) -> PRODUCER_CLASS: + def get_internal_producer(self, **kwargs) -> "async_channel.producer.Producer": """ Returns internal producer if exists else creates it :param kwargs: arguments for internal producer __init__ @@ -314,14 +340,14 @@ def get_internal_producer(self, **kwargs) -> PRODUCER_CLASS: """ if not self.internal_producer: try: - self.internal_producer = self.PRODUCER_CLASS(self, **kwargs) + self.internal_producer = self.PRODUCER_CLASS(self, **kwargs) # type: ignore except TypeError: self.logger.exception("PRODUCER_CLASS not defined") raise return self.internal_producer -def set_chan(chan, name) -> Channel: +def set_chan(chan: Channel, name: str) -> Channel: """ Set a new Channel instance in the channels list according to channel name :param chan: new Channel instance @@ -335,7 +361,7 @@ def set_chan(chan, name) -> Channel: raise ValueError(f"Channel {chan_name} already exists.") -def del_chan(name) -> None: +def del_chan(name: str) -> None: """ Delete a Channel instance from the channels list according to channel name :param name: name of the channel to delete @@ -344,7 +370,7 @@ def del_chan(name) -> None: channel_instances.ChannelInstances.instance().channels.pop(name, None) -def get_chan(chan_name) -> Channel: +def get_chan(chan_name: str) -> Channel: """ Return the channel instance from channel name :param chan_name: the channel name @@ -353,7 +379,7 @@ def get_chan(chan_name) -> Channel: return channel_instances.ChannelInstances.instance().channels[chan_name] -def _check_filters(consumer_filters, expected_filters) -> bool: +def _check_filters(consumer_filters: dict, expected_filters: dict) -> bool: """ Checks if the consumer match the specified filters Returns True if expected_filters is empty diff --git a/async_channel/channels/channel_instances.py b/async_channel/channels/channel_instances.py index f726c78..6bbd90f 100644 --- a/async_channel/channels/channel_instances.py +++ b/async_channel/channels/channel_instances.py @@ -16,8 +16,12 @@ """ This module defines created Channels interaction methods """ +import typing import async_channel.util.logging_util as logging +if typing.TYPE_CHECKING: + import async_channel.channels.channel + class ChannelInstances: """ @@ -28,7 +32,7 @@ class ChannelInstances: _instances = {} @classmethod - def instance(cls, *args, **kwargs): + def instance(cls, *args, **kwargs) -> "ChannelInstances": """ Create the instance if not already created Return the class instance @@ -41,10 +45,14 @@ def instance(cls, *args, **kwargs): return cls._instances[cls] def __init__(self): - self.channels = {} + self.channels: dict[ + str, dict[str, "async_channel.channels.channel.Channel"] + ] = {} -def set_chan_at_id(chan, name) -> None: +def set_chan_at_id( + chan: "async_channel.channels.channel.Channel", name: str +) -> "async_channel.channels.channel.Channel": """ Add a new async_channel to the channels instances dictionary at chan.id :param chan: the channel instance @@ -64,7 +72,7 @@ def set_chan_at_id(chan, name) -> None: raise ValueError(f"Channel {chan_name} already exists.") -def get_channels(chan_id) -> dict: +def get_channels(chan_id: str) -> dict[str, "async_channel.channels.channel.Channel"]: """ Get async_channel instances by async_channel id :param chan_id: the channel id @@ -76,7 +84,7 @@ def get_channels(chan_id) -> dict: raise KeyError(f"Channels not found with chan_id: {chan_id}") from exception -def del_channel_container(chan_id) -> None: +def del_channel_container(chan_id: str) -> None: """ Delete all async_channel id instances :param chan_id: the channel id @@ -84,7 +92,9 @@ def del_channel_container(chan_id) -> None: ChannelInstances.instance().channels.pop(chan_id, None) -def get_chan_at_id(chan_name, chan_id) -> object: +def get_chan_at_id( + chan_name: str, chan_id: str +) -> "async_channel.channels.channel.Channel": """ Get the channel instance that matches the name and the id :param chan_name: the channel name @@ -99,7 +109,7 @@ def get_chan_at_id(chan_name, chan_id) -> object: ) from exception -def del_chan_at_id(chan_name, chan_id) -> None: +def del_chan_at_id(chan_name: str, chan_id: str) -> None: """ Delete the channel instance that matches the name and the id :param chan_name: the channel name diff --git a/async_channel/consumer.py b/async_channel/consumer.py index d6a09f8..349d49c 100644 --- a/async_channel/consumer.py +++ b/async_channel/consumer.py @@ -17,6 +17,7 @@ Define async_channel Consumer class """ import asyncio +import typing import async_channel.util.logging_util as logging import async_channel.enums @@ -32,32 +33,32 @@ class Consumer: def __init__( self, - callback: object, + callback: typing.Callable, size: int = async_channel.constants.DEFAULT_QUEUE_SIZE, priority_level: int = async_channel.enums.ChannelConsumerPriorityLevels.HIGH.value, ): self.logger = logging.get_logger(self.__class__.__name__) # Consumer data queue. It contains producer's work (received through Producer.send()). - self.queue = asyncio.Queue(maxsize=size) + self.queue: asyncio.Queue = asyncio.Queue(maxsize=size) # Method to be called when performing task is done - self.callback = callback + self.callback: typing.Callable = callback # Should only be used with .cancel() - self.consume_task = None + self.consume_task: typing.Optional[asyncio.Task] = None """ Should be used as the perform while loop condition >>> while(self.should_stop): ... """ - self.should_stop = False + self.should_stop: bool = False # Default priority level # Used by Producers to call consumers by prioritization # The lowest level has the highest priority - self.priority_level = priority_level + self.priority_level: int = priority_level async def consume(self) -> None: """ @@ -70,9 +71,9 @@ async def consume(self) -> None: self.logger.debug("Cancelled task") except Exception as consume_exception: # pylint: disable=broad-except self.logger.exception( - exception=consume_exception, - publish_error_if_necessary=True, - error_message=f"Exception when calling callback on {self}: {consume_exception}", + consume_exception, + publish_error_if_necessary=True, # type: ignore + error_message=f"Exception when calling callback on {self}: {consume_exception}", # type: ignore ) finally: await self.consume_ends() @@ -109,7 +110,7 @@ def create_task(self) -> None: """ self.consume_task = asyncio.create_task(self.consume()) - async def run(self, with_task=True) -> None: + async def run(self, with_task: bool = True) -> None: """ - Initialize the consumer - Start the consumer main task @@ -119,7 +120,7 @@ async def run(self, with_task=True) -> None: if with_task: self.create_task() - async def join(self, timeout) -> None: + async def join(self, timeout: float) -> None: """ Implemented in SupervisedConsumer to wait for any "perform" call to be finished. Instantly returns on regular consumer @@ -132,7 +133,7 @@ async def join_queue(self) -> None: Instantly returns on regular consumer """ - def __str__(self): + def __str__(self) -> str: return f"{self.__class__.__name__} with callback: {self.callback.__name__}" @@ -146,7 +147,7 @@ def __init__(self): The constructor only override the callback to be the 'internal_callback' method """ super().__init__(None) - self.callback = self.internal_callback + self.callback: typing.Callable = self.internal_callback async def internal_callback(self, **kwargs: dict) -> None: """ @@ -163,7 +164,7 @@ class SupervisedConsumer(Consumer): def __init__( self, - callback: object, + callback: typing.Callable, size: int = async_channel.constants.DEFAULT_QUEUE_SIZE, priority_level: int = async_channel.enums.ChannelConsumerPriorityLevels.HIGH.value, ): @@ -173,10 +174,10 @@ def __init__( super().__init__(callback, size=size, priority_level=priority_level) # Clear when perform is running (set after) - self.idle = asyncio.Event() + self.idle: asyncio.Event = asyncio.Event() self.idle.set() - async def join(self, timeout) -> None: + async def join(self, timeout: float) -> None: """ Wait for any perform to be finished. """ diff --git a/async_channel/producer.py b/async_channel/producer.py index c13c660..75b2edd 100644 --- a/async_channel/producer.py +++ b/async_channel/producer.py @@ -17,9 +17,13 @@ Define async_channel Producer class """ import asyncio +import typing import async_channel.util.logging_util as logging +if typing.TYPE_CHECKING: + import async_channel.channels.channel + class Producer: """ @@ -31,30 +35,30 @@ class Producer: When the channel is synchronized priority levels are used to priorities or delay consumer calls """ - def __init__(self, channel): + def __init__(self, channel: "async_channel.channels.channel.Channel"): self.logger = logging.get_logger(self.__class__.__name__) # Related async_channel instance - self.channel = channel + self.channel: "async_channel.channels.channel.Channel" = channel """ Should only be used with .cancel() """ - self.produce_task = None + self.produce_task: typing.Optional[asyncio.Task] = None """ Should be used as the perform while loop condition while(self.should_stop): ... """ - self.should_stop = False + self.should_stop: bool = False """ Should be used to know if the producer is already started """ - self.is_running = False + self.is_running: bool = False - async def send(self, data) -> None: + async def send(self, data: typing.Any) -> None: """ Send to each consumer data though its queue :param data: data to be put into consumers queues @@ -120,7 +124,7 @@ async def wait_for_processing(self) -> None: ) async def synchronized_perform_consumers_queue( - self, priority_level, join_consumers, timeout + self, priority_level: int, join_consumers: bool, timeout: float ) -> None: """ Empties the queue synchronously for each consumers @@ -162,7 +166,7 @@ async def run(self) -> None: if not self.channel.is_synchronized: self.create_task() - def is_consumers_queue_empty(self, priority_level) -> bool: + def is_consumers_queue_empty(self, priority_level: int) -> bool: """ Check if consumers queue are empty :param priority_level: the consumer minimal priority level diff --git a/async_channel/util/channel_creator.py b/async_channel/util/channel_creator.py index 9ec24b8..9d41e23 100644 --- a/async_channel/util/channel_creator.py +++ b/async_channel/util/channel_creator.py @@ -18,11 +18,12 @@ """ import typing -import async_channel.channels as channels +if typing.TYPE_CHECKING: + import async_channel.channels.channel async def create_all_subclasses_channel( - channel_class: typing.ClassVar, + channel_class: typing.Type["async_channel.channels.channel.Channel"], set_chan_method: typing.Callable, is_synchronized: bool = False, **kwargs: dict @@ -44,12 +45,12 @@ async def create_all_subclasses_channel( async def create_channel_instance( - channel_class: typing.ClassVar, + channel_class: typing.Type["async_channel.channels.channel.Channel"], set_chan_method: typing.Callable, is_synchronized: bool = False, - channel_name: str = None, + channel_name: typing.Optional[str] = None, **kwargs: dict -) -> channels.Channel: +) -> "async_channel.channels.channel.Channel": """ Creates, initialize and start a async_channel instance :param channel_class: The class to instantiate with optional kwargs params diff --git a/dev_requirements.txt b/dev_requirements.txt index 2bc47ae..5444f6a 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -21,4 +21,4 @@ sphinx==3.2.1 sphinx_rtd_theme pylint -black==23.3.0 +black==25.12.0