Skip to content

Commit f2ddb0e

Browse files
committed
Adding TriggerCooldown
1 parent 3331d16 commit f2ddb0e

File tree

2 files changed

+227
-1
lines changed

2 files changed

+227
-1
lines changed

cooldowns/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from .buckets import CooldownBucket, SlashBucket
44
from .protocols import CooldownBucketProtocol
5-
from .cooldown import Cooldown, cooldown, shared_cooldown
5+
from .cooldown import Cooldown, cooldown, shared_cooldown, TriggerCooldown
66
from .static_cooldown import StaticCooldown, static_cooldown
77
from .cooldown_times_per import CooldownTimesPer
88
from .static_times_per import StaticTimesPer
@@ -46,6 +46,7 @@
4646
"StaticCooldown",
4747
"static_cooldown",
4848
"define_shared_static_cooldown",
49+
"TriggerCooldown",
4950
)
5051

5152
__version__ = "1.7.0"

cooldowns/cooldown.py

+225
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import datetime
55
import functools
6+
import inspect
67
from logging import getLogger
78
from typing import Callable, Optional, TypeVar, Dict, Union, Type
89

@@ -438,3 +439,227 @@ def bucket(self) -> CooldownBucketProtocol:
438439
def func(self) -> Optional[Callable]:
439440
"""Returns the wrapped function."""
440441
return self._func
442+
443+
444+
class TriggerCooldown:
445+
"""
446+
Creates a trigger cooldown.
447+
448+
This is useful if you want to be able to trigger a specific time_period cooldown
449+
inside the command itself.
450+
451+
TriggerCooldown creates two cooldonws in one instance:
452+
453+
- Normal cooldown. The same cooldown as @cooldowns.cooldown()
454+
- Trigger cooldown. A secondary cooldown that can only be activate
455+
with `.trigger()`
456+
457+
Parameters
458+
----------
459+
limit : `int`
460+
How many call's can be made in the time
461+
period specified by ``time_period``.
462+
463+
time_period : `Union[float, datetime.timedelta]`
464+
The time period related to ``limit``. This is seconds.
465+
466+
bucket : `Optional[CooldownBucketProtocol], optional`
467+
The :class:`Bucket` implementation to use
468+
as a bucket to separate cooldown buckets.
469+
470+
check : `Optional[MaybeCoro], optional`
471+
A Callable which dictates whether
472+
to apply the cooldown on current invoke.
473+
474+
If this Callable returns a truthy value,
475+
then the cooldown will be used for the current call.
476+
477+
I.e. If you wished to bypass cooldowns, you
478+
would return False if you invoked the Callable.
479+
480+
cooldown_id: Optional[Union[int, str]]
481+
Useful for resetting individual stacked cooldowns.
482+
This should be unique globally,
483+
behaviour is not guaranteed if not unique.
484+
485+
.. note::
486+
487+
This check will be given the same arguments as
488+
the item you are applying the cooldown to.
489+
490+
Usage
491+
-----
492+
- First create an instance of TriggerCooldown() with
493+
the desired parameters.
494+
495+
```
496+
trigger_cooldown = cooldowns.TriggerCooldown(1, 5, cooldowns.SlashBucket.author)
497+
```
498+
499+
- Then add the instance as a decorator to your command!
500+
501+
```
502+
@nextcord.slash_command()
503+
@trigger_cooldown
504+
async def command():
505+
```
506+
507+
The instance has to be defined in the same scope as the decorator!
508+
Now, `command()` has applied a normal cooldown of `1 limit` and
509+
`5 time_period`, as we defined it.
510+
511+
- Finally, inside your command, you can `trigger` the trigger cooldown:
512+
513+
```
514+
async def command():
515+
# Do things
516+
trigger_cooldown.trigger(30)
517+
# You can still do things after this.
518+
# Even you can `interaction.send()`.
519+
```
520+
521+
From the moment when the cooldown was triggered by `.trigger(30)`, every
522+
single call to this command within 30 seconds will raise CallableOnCooldown!
523+
524+
Raises
525+
------
526+
`RuntimeError`
527+
Expected the decorated function to be a coroutine.
528+
`CallableOnCooldown`
529+
This call resulted in a cooldown being put into effect.
530+
531+
532+
533+
"""
534+
def __init__(
535+
self,
536+
limit: int,
537+
time_period: Union[float, datetime.timedelta],
538+
bucket: Optional[CooldownBucketProtocol] = None,
539+
*,
540+
cooldown_id: Optional[Union[int, str]] = None,
541+
check: Optional[MaybeCoro] = default_check,
542+
):
543+
544+
self.triggered = False
545+
546+
self.limit = limit
547+
self.time_period = time_period
548+
self.bucket = bucket
549+
self.cooldown_id = cooldown_id
550+
self.check = check
551+
552+
# Normal Cooldown
553+
self.cooldown = Cooldown(
554+
limit= self.limit,
555+
time_period= self.time_period,
556+
bucket= self.bucket,
557+
cooldown_id= self.cooldown_id,
558+
check= self.check
559+
)
560+
561+
# Trigger Cooldown
562+
self.trigger_cooldown = Cooldown(
563+
limit= 1,
564+
time_period= self.time_period,
565+
bucket= self.bucket,
566+
cooldown_id= self.cooldown_id,
567+
check= self.check
568+
)
569+
570+
if cooldown_id:
571+
utils.shared_cooldown_refs[cooldown_id] = self.cooldown
572+
573+
async def trigger(self, time_period: Union[float, datetime.timedelta]) -> None:
574+
"""|coro|
575+
576+
Trigger the Trigger Cooldown instantly. Has to be awaited.
577+
578+
Parameters
579+
----------
580+
time_period : `Union[float, datetime.timedelta]`
581+
The time period that cooldwon will remain triggered.
582+
"""
583+
self.triggered = True
584+
self.trigger_cooldown.time_period = (
585+
time_period
586+
if isinstance(time_period, (float, int))
587+
else time_period.total_seconds()
588+
)
589+
590+
# Triggers the Cooldown leaving bucket.current = 0
591+
frame = inspect.currentframe().f_back
592+
_, _, _, values = inspect.getargvalues(frame)
593+
args = tuple(values.values())
594+
595+
async with self.trigger_cooldown(*args):
596+
return None
597+
598+
599+
def __call__(self, func: Callable) -> Callable:
600+
"""
601+
602+
Called as a decorator.
603+
604+
Parameters
605+
----------
606+
func : `Callable`
607+
The function being decorated.
608+
609+
Returns
610+
-------
611+
`Callable`
612+
Decorator
613+
614+
Raises
615+
------
616+
`RuntimeError`
617+
When given function is not coroutine.
618+
"""
619+
620+
if not asyncio.iscoroutinefunction(func):
621+
raise RuntimeError(
622+
f"Expected `func` to be a coroutine, "
623+
f"found {func} of type {func.__class__.__name__!r} instead" # noqa
624+
)
625+
# Links the cooldowns to the given function.
626+
self.cooldown._func = func
627+
self.trigger_cooldown._func = func
628+
629+
@functools.wraps(func)
630+
async def inner(*args, **kwargs):
631+
use_cooldown = await maybe_coro(self.check, *args, **kwargs)
632+
if not use_cooldown:
633+
return await maybe_coro(func, *args, **kwargs)
634+
635+
self_arg = None
636+
if "self" in kwargs:
637+
self_arg = kwargs.pop("self")
638+
639+
# If the cooldown is triggered...
640+
if self.triggered:
641+
# If still on triggered cooldown...
642+
if self.trigger_cooldown.remaining_calls(*args, **kwargs) < 1:
643+
# Runs the Trigger Cooldown.
644+
async with self.trigger_cooldown(*args, **kwargs):
645+
if self_arg:
646+
kwargs["self"] = self_arg
647+
result = await func(*args, **kwargs)
648+
else:
649+
result = await func(*args, **kwargs)
650+
return result
651+
# If not, untrigger the cooldown.
652+
else:
653+
self.triggered = False
654+
# If the cooldown is not triggered.
655+
# Runs the normal Cooldown.
656+
async with self.cooldown(*args, **kwargs):
657+
if self_arg:
658+
kwargs["self"] = self_arg
659+
result = await func(*args, **kwargs)
660+
else:
661+
result = await func(*args, **kwargs)
662+
return result
663+
# Return the decorator.
664+
return inner
665+

0 commit comments

Comments
 (0)