Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.2.2] - 2026-01-03
### Added
- Detailed typing to all classes

## [2.2.1] - 2023-09-03
### Added
- channel_name to creator
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Async-Channel [2.2.1](https://github.com/Drakkar-Software/Async-Channel/blob/master/CHANGELOG.md)
# Async-Channel [2.2.2](https://github.com/Drakkar-Software/Async-Channel/blob/master/CHANGELOG.md)
[![Codacy Badge](https://app.codacy.com/project/badge/Grade/523d43c62f1d4de08395752367f5fddc)](https://www.codacy.com/gh/Drakkar-Software/Async-Channel/dashboard?utm_source=github.com&utm_medium=referral&utm_content=Drakkar-Software/Async-Channel&utm_campaign=Badge_Grade)
[![PyPI](https://img.shields.io/pypi/v/async-channel.svg)](https://pypi.python.org/pypi/async-channel/)
[![Github-Action-CI](https://github.com/Drakkar-Software/Async-Channel/workflows/Async-Channel-Default-CI/badge.svg)](https://github.com/Drakkar-Software/Async-Channel/actions)
Expand Down
2 changes: 1 addition & 1 deletion async_channel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
)

PROJECT_NAME = "async-channel"
VERSION = "2.2.1" # major.minor.revision
VERSION = "2.2.2" # major.minor.revision

__all__ = [
"CHANNEL_WILDCARD",
Expand Down
83 changes: 54 additions & 29 deletions async_channel/channels/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import async_channel.enums
import async_channel.channels.channel_instances as channel_instances

import async_channel.producer


# pylint: disable=undefined-variable, not-callable
class Channel:
Expand All @@ -34,10 +36,14 @@ class Channel:
"""

# Channel producer class
PRODUCER_CLASS = None
PRODUCER_CLASS: typing.Optional[typing.Type["async_channel.producer.Producer"]] = (
None
)

# Channel consumer class
CONSUMER_CLASS = None
CONSUMER_CLASS: typing.Optional[typing.Type["async_channel.consumer.Consumer"]] = (
None
)

# Consumer instance in consumer filters
INSTANCE_KEY = "consumer_instance"
Expand All @@ -51,22 +57,26 @@ def __init__(self):
self.logger = logging.get_logger(self.__class__.__name__)

# Channel unique id
self.chan_id = None
self.chan_id: typing.Optional[str] = None

# Channel subscribed producers list
self.producers = []
self.producers: list["async_channel.producer.Producer"] = []

# Channel subscribed consumers list
self.consumers = []
# Channel subscribed consumers list: list dicts of dicts containing:
# - At least a consumer instance under the INSTANCE_KEY key
# - Possibly other filters under other keys and values
self.consumers: list[dict[str, typing.Any]] = []

# Used to perform global send from non-producer context
self.internal_producer = None
self.internal_producer: typing.Optional["async_channel.producer.Producer"] = (
None
)

# Used to save producers state (paused or not)
self.is_paused = True
self.is_paused: bool = True

# Used to synchronize producers and consumer
self.is_synchronized = False
self.is_synchronized: bool = False

@classmethod
def get_name(cls) -> str:
Expand All @@ -80,11 +90,11 @@ def get_name(cls) -> str:
async def new_consumer(
self,
callback: object = None,
consumer_filters: dict = None,
internal_consumer: object = None,
consumer_filters: typing.Optional[dict] = None,
internal_consumer: typing.Optional["async_channel.consumer.Consumer"] = None,
size: int = 0,
priority_level: int = DEFAULT_PRIORITY_LEVEL,
) -> CONSUMER_CLASS:
) -> "async_channel.consumer.Consumer":
"""
Create an appropriate consumer instance for this async_channel and add it to the consumer list
Should end by calling '_check_producers_state'
Expand All @@ -98,15 +108,18 @@ async def new_consumer(
consumer = (
internal_consumer
if internal_consumer
else self.CONSUMER_CLASS(callback, size=size, priority_level=priority_level)
else self.CONSUMER_CLASS(callback, size=size, priority_level=priority_level) # type: ignore
)
await self._add_new_consumer_and_run(consumer, consumer_filters)
await self._check_producers_state()
return consumer

# pylint: disable=unused-argument
async def _add_new_consumer_and_run(
self, consumer: CONSUMER_CLASS, consumer_filters: dict, **kwargs
self,
consumer: "async_channel.consumer.Consumer",
consumer_filters: typing.Optional[dict],
**kwargs,
) -> None:
"""
Should be called by 'new_consumer' to add the consumer to self.consumers and call 'consumer.run()'
Expand All @@ -120,7 +133,9 @@ async def _add_new_consumer_and_run(
self.add_new_consumer(consumer, consumer_filters)
await consumer.run(with_task=not self.is_synchronized)

def add_new_consumer(self, consumer, consumer_filters) -> None:
def add_new_consumer(
self, consumer: "async_channel.consumer.Consumer", consumer_filters: dict
) -> None:
"""
Add a new consumer to consumer list with filters
:param consumer: the consumer to add
Expand All @@ -130,7 +145,9 @@ def add_new_consumer(self, consumer, consumer_filters) -> None:
consumer_filters[self.INSTANCE_KEY] = consumer
self.consumers.append(consumer_filters)

def get_consumer_from_filters(self, consumer_filters) -> list:
def get_consumer_from_filters(
self, consumer_filters: dict
) -> list["async_channel.consumer.Consumer"]:
"""
Returns the instance filtered consumers list
WARNING:
Expand All @@ -141,15 +158,17 @@ def get_consumer_from_filters(self, consumer_filters) -> list:
"""
return self._filter_consumers(consumer_filters)

def get_consumers(self) -> list:
def get_consumers(self) -> list["async_channel.consumer.Consumer"]:
"""
Returns all consumers instance
Can be overwritten according to the class needs
:return: the subscribed consumers list
"""
return [consumer[self.INSTANCE_KEY] for consumer in self.consumers]

def get_prioritized_consumers(self, priority_level) -> list:
def get_prioritized_consumers(
self, priority_level: int
) -> list["async_channel.consumer.Consumer"]:
"""
Returns all consumers instance
Can be overwritten according to the class needs
Expand All @@ -161,7 +180,9 @@ def get_prioritized_consumers(self, priority_level) -> list:
if consumer[self.INSTANCE_KEY].priority_level <= priority_level
]

def _filter_consumers(self, consumer_filters) -> list:
def _filter_consumers(
self, consumer_filters: dict
) -> list["async_channel.consumer.Consumer"]:
"""
Returns the consumers that match the selection
Returns all consumer instances if consumer_filter is empty
Expand All @@ -174,7 +195,9 @@ def _filter_consumers(self, consumer_filters) -> list:
if _check_filters(consumer, consumer_filters)
]

async def remove_consumer(self, consumer: CONSUMER_CLASS) -> None:
async def remove_consumer(
self, consumer: "async_channel.consumer.Consumer"
) -> None:
"""
Should be overwritten according to the class needs
Should end by calling '_check_producers_state' and then 'consumer.stop'
Expand Down Expand Up @@ -234,7 +257,9 @@ def _should_resume_producers(self) -> bool:
return True
return False

async def register_producer(self, producer) -> None:
async def register_producer(
self, producer: "async_channel.producer.Producer"
) -> None:
"""
Add the producer to producers list
Can be overwritten to perform additional action when registering
Expand All @@ -247,7 +272,7 @@ async def register_producer(self, producer) -> None:
if self.is_paused:
await producer.pause()

def unregister_producer(self, producer) -> None:
def unregister_producer(self, producer: "async_channel.producer.Producer") -> None:
"""
Remove the producer from producers list
Can be overwritten to perform additional action when registering
Expand All @@ -256,7 +281,7 @@ def unregister_producer(self, producer) -> None:
if producer in self.producers:
self.producers.remove(producer)

def get_producers(self) -> typing.Iterable:
def get_producers(self) -> typing.Iterable["async_channel.producer.Producer"]:
"""
Should be overwritten according to the class needs
:return: async_channel producers iterable
Expand Down Expand Up @@ -306,22 +331,22 @@ async def modify(self, **kwargs) -> None:
for producer in self.get_producers():
await producer.modify(**kwargs)

def get_internal_producer(self, **kwargs) -> PRODUCER_CLASS:
def get_internal_producer(self, **kwargs) -> "async_channel.producer.Producer":
"""
Returns internal producer if exists else creates it
:param kwargs: arguments for internal producer __init__
:return: internal producer instance
"""
if not self.internal_producer:
try:
self.internal_producer = self.PRODUCER_CLASS(self, **kwargs)
self.internal_producer = self.PRODUCER_CLASS(self, **kwargs) # type: ignore
except TypeError:
self.logger.exception("PRODUCER_CLASS not defined")
raise
return self.internal_producer


def set_chan(chan, name) -> Channel:
def set_chan(chan: Channel, name: str) -> Channel:
"""
Set a new Channel instance in the channels list according to channel name
:param chan: new Channel instance
Expand All @@ -335,7 +360,7 @@ def set_chan(chan, name) -> Channel:
raise ValueError(f"Channel {chan_name} already exists.")


def del_chan(name) -> None:
def del_chan(name: str) -> None:
"""
Delete a Channel instance from the channels list according to channel name
:param name: name of the channel to delete
Expand All @@ -344,7 +369,7 @@ def del_chan(name) -> None:
channel_instances.ChannelInstances.instance().channels.pop(name, None)


def get_chan(chan_name) -> Channel:
def get_chan(chan_name: str) -> Channel:
"""
Return the channel instance from channel name
:param chan_name: the channel name
Expand All @@ -353,7 +378,7 @@ def get_chan(chan_name) -> Channel:
return channel_instances.ChannelInstances.instance().channels[chan_name]


def _check_filters(consumer_filters, expected_filters) -> bool:
def _check_filters(consumer_filters: dict, expected_filters: dict) -> bool:
"""
Checks if the consumer match the specified filters
Returns True if expected_filters is empty
Expand Down
22 changes: 15 additions & 7 deletions async_channel/channels/channel_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
"""
import async_channel.util.logging_util as logging

import async_channel.channels.channel


class ChannelInstances:
"""
Expand All @@ -28,7 +30,7 @@ class ChannelInstances:
_instances = {}

@classmethod
def instance(cls, *args, **kwargs):
def instance(cls, *args, **kwargs) -> "ChannelInstances":
"""
Create the instance if not already created
Return the class instance
Expand All @@ -41,10 +43,14 @@ def instance(cls, *args, **kwargs):
return cls._instances[cls]

def __init__(self):
self.channels = {}
self.channels: dict[
str, dict[str, "async_channel.channels.channel.Channel"]
] = {}


def set_chan_at_id(chan, name) -> None:
def set_chan_at_id(
chan: "async_channel.channels.channel.Channel", name: str
) -> "async_channel.channels.channel.Channel":
"""
Add a new async_channel to the channels instances dictionary at chan.id
:param chan: the channel instance
Expand All @@ -64,7 +70,7 @@ def set_chan_at_id(chan, name) -> None:
raise ValueError(f"Channel {chan_name} already exists.")


def get_channels(chan_id) -> dict:
def get_channels(chan_id: str) -> dict[str, "async_channel.channels.channel.Channel"]:
"""
Get async_channel instances by async_channel id
:param chan_id: the channel id
Expand All @@ -76,15 +82,17 @@ def get_channels(chan_id) -> dict:
raise KeyError(f"Channels not found with chan_id: {chan_id}") from exception


def del_channel_container(chan_id) -> None:
def del_channel_container(chan_id: str) -> None:
"""
Delete all async_channel id instances
:param chan_id: the channel id
"""
ChannelInstances.instance().channels.pop(chan_id, None)


def get_chan_at_id(chan_name, chan_id) -> object:
def get_chan_at_id(
chan_name: str, chan_id: str
) -> "async_channel.channels.channel.Channel":
"""
Get the channel instance that matches the name and the id
:param chan_name: the channel name
Expand All @@ -99,7 +107,7 @@ def get_chan_at_id(chan_name, chan_id) -> object:
) from exception


def del_chan_at_id(chan_name, chan_id) -> None:
def del_chan_at_id(chan_name: str, chan_id: str) -> None:
"""
Delete the channel instance that matches the name and the id
:param chan_name: the channel name
Expand Down
Loading
Loading