Skip to content

Commit

Permalink
fix(*): some TypeVar not marked covariant
Browse files Browse the repository at this point in the history
Signed-off-by: Rongrong <[email protected]>
  • Loading branch information
Rongronggg9 committed Jun 26, 2024
1 parent 18597f3 commit cb94d6c
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 25 deletions.
10 changes: 5 additions & 5 deletions src/helpers/bg/_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@

from ._helper import BgHelper

H = TypeVar('H', bound=BgHelper)
BgHelperT_co = TypeVar('BgHelperT_co', bound=BgHelper, covariant=True)
P = ParamSpec('P')
R = TypeVar('R')


class BgDecorator(Generic[P, R, H]):
def __init__(self, _bound_helper_cls: type[H] = BgHelper):
class BgDecorator(Generic[P, R, BgHelperT_co]):
def __init__(self, _bound_helper_cls: type[BgHelperT_co] = BgHelper):
self._bound_helper_cls = _bound_helper_cls

self._loop: Optional[asyncio.AbstractEventLoop] = None
self._helpers: list[H] = []
self._helpers: list[BgHelperT_co] = []

def init_sync(self, loop: asyncio.AbstractEventLoop):
self._loop = loop
Expand All @@ -37,7 +37,7 @@ async def close(self):

@staticmethod
def _create_wrappers(
helper: H,
helper: BgHelperT_co,
func: Callable[P, Awaitable[R]],
available_wrapped_methods: tuple[str, ...],
) -> dict[str, Union[Callable[P, Awaitable[R]], Callable[P, Awaitable[None]], Callable[P, None]]]:
Expand Down
8 changes: 4 additions & 4 deletions src/helpers/queue/_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@
from ._helper import QueuedHelper
from ..bg import BgDecorator

H = TypeVar('H', bound=QueuedHelper)
QueuedHelperT_co = TypeVar('QueuedHelperT_co', bound=QueuedHelper, covariant=True)
P = ParamSpec('P')
R = TypeVar('R')
QP = ParamSpec('QP')


class QueuedDecorator(BgDecorator[P, R, H], Generic[P, R, H, QP]):
class QueuedDecorator(BgDecorator[P, R, QueuedHelperT_co], Generic[P, R, QueuedHelperT_co, QP]):
def __init__(
self,
queue_constructor: Callable[QP, asyncio.Queue] = asyncio.Queue,
_bound_helper_cls: type[H] = QueuedHelper
_bound_helper_cls: type[QueuedHelperT_co] = QueuedHelper
):
super().__init__(_bound_helper_cls=_bound_helper_cls)
self._queue_constructor = queue_constructor
self._helpers: list[H]
self._helpers: list[QueuedHelperT_co]

def __call__(
self,
Expand Down
32 changes: 16 additions & 16 deletions src/monitor/_stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ class StatCounter(Counter[str, int]):
timeout_unknown_error: int = _gen_property('timeout_unknown_error')


SC = TypeVar('SC', bound=StatCounter)
StatCounterT_co = TypeVar('StatCounterT_co', bound=StatCounter, covariant=True)


class Stat(ABC, Generic[SC]):
class Stat(ABC, Generic[StatCounterT_co]):
_do_gc_after_summarizing_tier2: ClassVar[bool] = False

def __init__(self, _bound_counter_cls: type[SC] = StatCounter):
def __init__(self, _bound_counter_cls: type[StatCounterT_co] = StatCounter):
self._bound_counter_cls = _bound_counter_cls

self._counter_tier1: SC = self._bound_counter_cls() # periodical summary
self._counter_tier2: SC = self._bound_counter_cls() # unconditional summary
self._counter_tier1: StatCounterT_co = self._bound_counter_cls() # periodical summary
self._counter_tier2: StatCounterT_co = self._bound_counter_cls() # unconditional summary
self._tier1_last_summary_time: Optional[float] = None
self._tier2_last_summary_time: Optional[float] = self._tier1_last_summary_time
self._tier1_summary_period: float = float(TIMEOUT) # seconds
Expand Down Expand Up @@ -69,7 +69,7 @@ def _describe_in_progress(self) -> str:
return f'in progress({self._in_progress_count})' if self._in_progress_count else ''

@staticmethod
def _describe_abnormal(counter: SC) -> str:
def _describe_abnormal(counter: StatCounterT_co) -> str:
return ', '.join(filter(None, (
f'cancelled({counter.cancelled})' if counter.cancelled else '',
f'unknown error({counter.unknown_error})' if counter.unknown_error else '',
Expand All @@ -78,10 +78,10 @@ def _describe_abnormal(counter: SC) -> str:
)))

@abstractmethod
def _stat(self, counter: SC) -> str:
def _stat(self, counter: StatCounterT_co) -> str:
pass

def _summarize(self, counter: MC, default_log_level: int, time_diff: int):
def _summarize(self, counter: MonitorCounterT_co, default_log_level: int, time_diff: int):
stat = self._stat(counter) or 'nothing was submitted'
logger.log(
logging.WARNING
Expand Down Expand Up @@ -128,13 +128,13 @@ class MonitorCounter(StatCounter):
resubmitted: int = _gen_property('resubmitted')


MC = TypeVar('MC', bound=MonitorCounter)
MonitorCounterT_co = TypeVar('MonitorCounterT_co', bound=MonitorCounter, covariant=True)


class MonitorStat(Stat[MC]):
class MonitorStat(Stat[MonitorCounterT_co]):
_do_gc_after_summarizing_tier2 = True

def __init__(self, _bound_counter_cls: type[MC] = MonitorCounter):
def __init__(self, _bound_counter_cls: type[MonitorCounterT_co] = MonitorCounter):
super().__init__(_bound_counter_cls=_bound_counter_cls)

def not_updated(self):
Expand Down Expand Up @@ -163,7 +163,7 @@ def deferred(self):
def resubmitted(self):
self._counter_tier2['resubmitted'] += 1

def _stat(self, counter: MC) -> str:
def _stat(self, counter: MonitorCounterT_co) -> str:
scheduling_stat = ', '.join(filter(None, (
self._describe_in_progress(),
f'deferred({counter.deferred})' if counter.deferred else '',
Expand All @@ -188,11 +188,11 @@ class NotifierCounter(StatCounter):
deactivated: int = _gen_property('deactivated')


NC = TypeVar('NC', bound=NotifierCounter)
NotifierCounterT_co = TypeVar('NotifierCounterT_co', bound=NotifierCounter, covariant=True)


class NotifierStat(Stat[NC]):
def __init__(self, _bound_counter_cls: type[NC] = NotifierCounter):
class NotifierStat(Stat[NotifierCounterT_co]):
def __init__(self, _bound_counter_cls: type[NotifierCounterT_co] = NotifierCounter):
super().__init__(_bound_counter_cls=_bound_counter_cls)

def notified(self):
Expand All @@ -201,7 +201,7 @@ def notified(self):
def deactivated(self):
self._counter_tier2['deactivated'] += 1

def _stat(self, counter: NC) -> str:
def _stat(self, counter: NotifierCounterT_co) -> str:
return ', '.join(filter(None, (
self._describe_in_progress(),
f'notified({counter.notified})' if counter.notified else '',
Expand Down

0 comments on commit cb94d6c

Please sign in to comment.