Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.2.2] - 2026-01-03
### Added
- Detailed typing to all classes

## [2.2.1] - 2023-09-03
### Added
- channel_name to creator
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Async-Channel [2.2.1](https://github.com/Drakkar-Software/Async-Channel/blob/master/CHANGELOG.md)
# Async-Channel [2.2.2](https://github.com/Drakkar-Software/Async-Channel/blob/master/CHANGELOG.md)
[![Codacy Badge](https://app.codacy.com/project/badge/Grade/523d43c62f1d4de08395752367f5fddc)](https://www.codacy.com/gh/Drakkar-Software/Async-Channel/dashboard?utm_source=github.com&utm_medium=referral&utm_content=Drakkar-Software/Async-Channel&utm_campaign=Badge_Grade)
[![PyPI](https://img.shields.io/pypi/v/async-channel.svg)](https://pypi.python.org/pypi/async-channel/)
[![Github-Action-CI](https://github.com/Drakkar-Software/Async-Channel/workflows/Async-Channel-Default-CI/badge.svg)](https://github.com/Drakkar-Software/Async-Channel/actions)
Expand Down
2 changes: 1 addition & 1 deletion async_channel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
)

PROJECT_NAME = "async-channel"
VERSION = "2.2.1" # major.minor.revision
VERSION = "2.2.2" # major.minor.revision

__all__ = [
"CHANNEL_WILDCARD",
Expand Down
84 changes: 55 additions & 29 deletions async_channel/channels/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import async_channel.enums
import async_channel.channels.channel_instances as channel_instances

if typing.TYPE_CHECKING:
import async_channel.producer


# pylint: disable=undefined-variable, not-callable
class Channel:
Expand All @@ -34,10 +37,14 @@ class Channel:
"""

# Channel producer class
PRODUCER_CLASS = None
PRODUCER_CLASS: typing.Optional[typing.Type["async_channel.producer.Producer"]] = (
None
)

# Channel consumer class
CONSUMER_CLASS = None
CONSUMER_CLASS: typing.Optional[typing.Type["async_channel.consumer.Consumer"]] = (
None
)

# Consumer instance in consumer filters
INSTANCE_KEY = "consumer_instance"
Expand All @@ -51,22 +58,26 @@ def __init__(self):
self.logger = logging.get_logger(self.__class__.__name__)

# Channel unique id
self.chan_id = None
self.chan_id: typing.Optional[str] = None

# Channel subscribed producers list
self.producers = []
self.producers: list["async_channel.producer.Producer"] = []

# Channel subscribed consumers list
self.consumers = []
# Channel subscribed consumers list: list dicts of dicts containing:
# - At least a consumer instance under the INSTANCE_KEY key
# - Possibly other filters under other keys and values
self.consumers: list[dict[str, typing.Any]] = []

# Used to perform global send from non-producer context
self.internal_producer = None
self.internal_producer: typing.Optional["async_channel.producer.Producer"] = (
None
)

# Used to save producers state (paused or not)
self.is_paused = True
self.is_paused: bool = True

# Used to synchronize producers and consumer
self.is_synchronized = False
self.is_synchronized: bool = False

@classmethod
def get_name(cls) -> str:
Expand All @@ -80,11 +91,11 @@ def get_name(cls) -> str:
async def new_consumer(
self,
callback: object = None,
consumer_filters: dict = None,
internal_consumer: object = None,
consumer_filters: typing.Optional[dict] = None,
internal_consumer: typing.Optional["async_channel.consumer.Consumer"] = None,
size: int = 0,
priority_level: int = DEFAULT_PRIORITY_LEVEL,
) -> CONSUMER_CLASS:
) -> "async_channel.consumer.Consumer":
"""
Create an appropriate consumer instance for this async_channel and add it to the consumer list
Should end by calling '_check_producers_state'
Expand All @@ -98,15 +109,18 @@ async def new_consumer(
consumer = (
internal_consumer
if internal_consumer
else self.CONSUMER_CLASS(callback, size=size, priority_level=priority_level)
else self.CONSUMER_CLASS(callback, size=size, priority_level=priority_level) # type: ignore
)
await self._add_new_consumer_and_run(consumer, consumer_filters)
await self._check_producers_state()
return consumer

# pylint: disable=unused-argument
async def _add_new_consumer_and_run(
self, consumer: CONSUMER_CLASS, consumer_filters: dict, **kwargs
self,
consumer: "async_channel.consumer.Consumer",
consumer_filters: typing.Optional[dict],
**kwargs,
) -> None:
"""
Should be called by 'new_consumer' to add the consumer to self.consumers and call 'consumer.run()'
Expand All @@ -120,7 +134,9 @@ async def _add_new_consumer_and_run(
self.add_new_consumer(consumer, consumer_filters)
await consumer.run(with_task=not self.is_synchronized)

def add_new_consumer(self, consumer, consumer_filters) -> None:
def add_new_consumer(
self, consumer: "async_channel.consumer.Consumer", consumer_filters: dict
) -> None:
"""
Add a new consumer to consumer list with filters
:param consumer: the consumer to add
Expand All @@ -130,7 +146,9 @@ def add_new_consumer(self, consumer, consumer_filters) -> None:
consumer_filters[self.INSTANCE_KEY] = consumer
self.consumers.append(consumer_filters)

def get_consumer_from_filters(self, consumer_filters) -> list:
def get_consumer_from_filters(
self, consumer_filters: dict
) -> list["async_channel.consumer.Consumer"]:
"""
Returns the instance filtered consumers list
WARNING:
Expand All @@ -141,15 +159,17 @@ def get_consumer_from_filters(self, consumer_filters) -> list:
"""
return self._filter_consumers(consumer_filters)

def get_consumers(self) -> list:
def get_consumers(self) -> list["async_channel.consumer.Consumer"]:
"""
Returns all consumers instance
Can be overwritten according to the class needs
:return: the subscribed consumers list
"""
return [consumer[self.INSTANCE_KEY] for consumer in self.consumers]

def get_prioritized_consumers(self, priority_level) -> list:
def get_prioritized_consumers(
self, priority_level: int
) -> list["async_channel.consumer.Consumer"]:
"""
Returns all consumers instance
Can be overwritten according to the class needs
Expand All @@ -161,7 +181,9 @@ def get_prioritized_consumers(self, priority_level) -> list:
if consumer[self.INSTANCE_KEY].priority_level <= priority_level
]

def _filter_consumers(self, consumer_filters) -> list:
def _filter_consumers(
self, consumer_filters: dict
) -> list["async_channel.consumer.Consumer"]:
"""
Returns the consumers that match the selection
Returns all consumer instances if consumer_filter is empty
Expand All @@ -174,7 +196,9 @@ def _filter_consumers(self, consumer_filters) -> list:
if _check_filters(consumer, consumer_filters)
]

async def remove_consumer(self, consumer: CONSUMER_CLASS) -> None:
async def remove_consumer(
self, consumer: "async_channel.consumer.Consumer"
) -> None:
"""
Should be overwritten according to the class needs
Should end by calling '_check_producers_state' and then 'consumer.stop'
Expand Down Expand Up @@ -234,7 +258,9 @@ def _should_resume_producers(self) -> bool:
return True
return False

async def register_producer(self, producer) -> None:
async def register_producer(
self, producer: "async_channel.producer.Producer"
) -> None:
"""
Add the producer to producers list
Can be overwritten to perform additional action when registering
Expand All @@ -247,7 +273,7 @@ async def register_producer(self, producer) -> None:
if self.is_paused:
await producer.pause()

def unregister_producer(self, producer) -> None:
def unregister_producer(self, producer: "async_channel.producer.Producer") -> None:
"""
Remove the producer from producers list
Can be overwritten to perform additional action when registering
Expand All @@ -256,7 +282,7 @@ def unregister_producer(self, producer) -> None:
if producer in self.producers:
self.producers.remove(producer)

def get_producers(self) -> typing.Iterable:
def get_producers(self) -> typing.Iterable["async_channel.producer.Producer"]:
"""
Should be overwritten according to the class needs
:return: async_channel producers iterable
Expand Down Expand Up @@ -306,22 +332,22 @@ async def modify(self, **kwargs) -> None:
for producer in self.get_producers():
await producer.modify(**kwargs)

def get_internal_producer(self, **kwargs) -> PRODUCER_CLASS:
def get_internal_producer(self, **kwargs) -> "async_channel.producer.Producer":
"""
Returns internal producer if exists else creates it
:param kwargs: arguments for internal producer __init__
:return: internal producer instance
"""
if not self.internal_producer:
try:
self.internal_producer = self.PRODUCER_CLASS(self, **kwargs)
self.internal_producer = self.PRODUCER_CLASS(self, **kwargs) # type: ignore
except TypeError:
self.logger.exception("PRODUCER_CLASS not defined")
raise
return self.internal_producer


def set_chan(chan, name) -> Channel:
def set_chan(chan: Channel, name: str) -> Channel:
"""
Set a new Channel instance in the channels list according to channel name
:param chan: new Channel instance
Expand All @@ -335,7 +361,7 @@ def set_chan(chan, name) -> Channel:
raise ValueError(f"Channel {chan_name} already exists.")


def del_chan(name) -> None:
def del_chan(name: str) -> None:
"""
Delete a Channel instance from the channels list according to channel name
:param name: name of the channel to delete
Expand All @@ -344,7 +370,7 @@ def del_chan(name) -> None:
channel_instances.ChannelInstances.instance().channels.pop(name, None)


def get_chan(chan_name) -> Channel:
def get_chan(chan_name: str) -> Channel:
"""
Return the channel instance from channel name
:param chan_name: the channel name
Expand All @@ -353,7 +379,7 @@ def get_chan(chan_name) -> Channel:
return channel_instances.ChannelInstances.instance().channels[chan_name]


def _check_filters(consumer_filters, expected_filters) -> bool:
def _check_filters(consumer_filters: dict, expected_filters: dict) -> bool:
"""
Checks if the consumer match the specified filters
Returns True if expected_filters is empty
Expand Down
24 changes: 17 additions & 7 deletions async_channel/channels/channel_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
"""
This module defines created Channels interaction methods
"""
import typing
import async_channel.util.logging_util as logging

if typing.TYPE_CHECKING:
import async_channel.channels.channel


class ChannelInstances:
"""
Expand All @@ -28,7 +32,7 @@ class ChannelInstances:
_instances = {}

@classmethod
def instance(cls, *args, **kwargs):
def instance(cls, *args, **kwargs) -> "ChannelInstances":
"""
Create the instance if not already created
Return the class instance
Expand All @@ -41,10 +45,14 @@ def instance(cls, *args, **kwargs):
return cls._instances[cls]

def __init__(self):
self.channels = {}
self.channels: dict[
str, dict[str, "async_channel.channels.channel.Channel"]
] = {}


def set_chan_at_id(chan, name) -> None:
def set_chan_at_id(
chan: "async_channel.channels.channel.Channel", name: str
) -> "async_channel.channels.channel.Channel":
"""
Add a new async_channel to the channels instances dictionary at chan.id
:param chan: the channel instance
Expand All @@ -64,7 +72,7 @@ def set_chan_at_id(chan, name) -> None:
raise ValueError(f"Channel {chan_name} already exists.")


def get_channels(chan_id) -> dict:
def get_channels(chan_id: str) -> dict[str, "async_channel.channels.channel.Channel"]:
"""
Get async_channel instances by async_channel id
:param chan_id: the channel id
Expand All @@ -76,15 +84,17 @@ def get_channels(chan_id) -> dict:
raise KeyError(f"Channels not found with chan_id: {chan_id}") from exception


def del_channel_container(chan_id) -> None:
def del_channel_container(chan_id: str) -> None:
"""
Delete all async_channel id instances
:param chan_id: the channel id
"""
ChannelInstances.instance().channels.pop(chan_id, None)


def get_chan_at_id(chan_name, chan_id) -> object:
def get_chan_at_id(
chan_name: str, chan_id: str
) -> "async_channel.channels.channel.Channel":
"""
Get the channel instance that matches the name and the id
:param chan_name: the channel name
Expand All @@ -99,7 +109,7 @@ def get_chan_at_id(chan_name, chan_id) -> object:
) from exception


def del_chan_at_id(chan_name, chan_id) -> None:
def del_chan_at_id(chan_name: str, chan_id: str) -> None:
"""
Delete the channel instance that matches the name and the id
:param chan_name: the channel name
Expand Down
Loading