-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtrigger_cooldown.py
266 lines (211 loc) · 8.67 KB
/
trigger_cooldown.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
import asyncio
import datetime
import inspect
import functools
from typing import Callable, Optional, Union
from .cooldown import Cooldown
from .utils import (
MaybeCoro,
maybe_coro,
default_check,
)
from . import utils
from .protocols import CooldownBucketProtocol
class TriggerCooldown:
def __init__(
self,
limit: int,
time_period: Union[float, datetime.timedelta],
bucket: Optional[CooldownBucketProtocol] = None,
*,
cooldown_id: Optional[Union[int, str]] = None,
trigger_cooldown_id: Optional[Union[int, str]] = None,
check: Optional[MaybeCoro] = default_check,
):
"""
Creates a trigger cooldown.
This is useful if you want to be able to trigger a specific time_period cooldown
inside the command itself.
TriggerCooldown creates two cooldonws in one instance:
- Normal cooldown. The same cooldown as @cooldowns.cooldown()
- Trigger cooldown. A secondary cooldown that can only be activate
with `.trigger()`
Parameters
----------
limit : `int`
How many call's can be made in the time
period specified by ``time_period``.
time_period : `Union[float, datetime.timedelta]`
The time period related to ``limit``. This is seconds.
bucket : `Optional[CooldownBucketProtocol], optional`
The :class:`Bucket` implementation to use
as a bucket to separate cooldown buckets.
check : `Optional[MaybeCoro], optional`
A Callable which dictates whether
to apply the cooldown on current invoke.
If this Callable returns a truthy value,
then the cooldown will be used for the current call.
I.e. If you wished to bypass cooldowns, you
would return False if you invoked the Callable.
cooldown_id: Optional[Union[int, str]]
Useful for resetting individual stacked cooldowns.
This should be unique globally,
behaviour is not guaranteed if not unique.
.. note::
This check will be given the same arguments as
the item you are applying the cooldown to.
Usage
-----
- First create an instance of TriggerCooldown() with
the desired parameters.
```
trigger_cooldown = cooldowns.TriggerCooldown(1, 5, cooldowns.SlashBucket.author)
```
- Then add the instance as a decorator to your command!
```
@nextcord.slash_command()
@trigger_cooldown
async def command():
```
The instance has to be defined in the same scope as the decorator!
Now, `command()` has applied a normal cooldown of `1 limit` and
`5 time_period`, as we defined it.
- Finally, inside your command, you can `trigger` the trigger cooldown:
```
async def command():
# Do things
trigger_cooldown.trigger(30)
# You can still do things after this.
# Even you can `interaction.send()`.
```
From the moment when the cooldown was triggered by `.trigger(30)`, every
single call to this command within 30 seconds will raise CallableOnCooldown!
Raises
------
`RuntimeError`
Expected the decorated function to be a coroutine.
`CallableOnCooldown`
This call resulted in a cooldown being put into effect.
"""
self.limit = limit
self.time_period = time_period
self.bucket = bucket
self.cooldown_id = cooldown_id
self.trigger_cooldown_id = trigger_cooldown_id
self.check = check
# Normal Cooldown
self.cooldown = Cooldown(
limit= self.limit,
time_period= self.time_period,
bucket= self.bucket,
cooldown_id= self.cooldown_id,
check= self.check
)
# Trigger Cooldown
self.trigger_cooldown = Cooldown(
limit= 1,
time_period= self.time_period,
bucket= self.bucket,
cooldown_id= self.trigger_cooldown_id,
check= self.check
)
if cooldown_id:
utils.shared_cooldown_refs[cooldown_id] = self.cooldown
else:
current_cooldowns = utils.shared_cooldown_refs.keys()
for i in range(10_000):
generated_id = f"normal_cooldown_{i:02}"
if generated_id not in current_cooldowns:
utils.shared_cooldown_refs[generated_id] = self.cooldown
self.cooldown_id = generated_id
if trigger_cooldown_id:
utils.shared_cooldown_refs[trigger_cooldown_id] = self.trigger_cooldown
else:
current_cooldowns = utils.shared_cooldown_refs.keys()
for i in range(10_000):
generated_id = f"trigger_cooldown_{i:02}"
if generated_id not in current_cooldowns:
utils.shared_cooldown_refs[generated_id] = self.trigger_cooldown
self.trigger_cooldown_id = generated_id
async def trigger(self, time_period: Union[float, datetime.timedelta]) -> None:
"""|coro|
Trigger the Trigger Cooldown instantly. Has to be awaited.
Parameters
----------
time_period : `Union[float, datetime.timedelta]`
The time period that cooldwon will remain triggered.
"""
self.trigger_cooldown.time_period = (
time_period
if isinstance(time_period, (float, int))
else time_period.total_seconds()
)
# Triggers the Cooldown leaving bucket.current = 0
frame = inspect.currentframe().f_back
_, _, _, values = inspect.getargvalues(frame)
args = tuple(values.values())
async with self.trigger_cooldown(*args):
return None
def __call__(self, func: Callable) -> Callable:
"""
Called as a decorator.
Parameters
----------
func : `Callable`
The function being decorated.
Returns
-------
`Callable`
Decorator
Raises
------
`RuntimeError`
When given function is not coroutine.
"""
_cooldown: Cooldown = utils.shared_cooldown_refs[self.cooldown_id]
_trigger_cooldown: Cooldown = utils.shared_cooldown_refs[self.trigger_cooldown_id]
if not asyncio.iscoroutinefunction(func):
raise RuntimeError(
f"Expected `func` to be a coroutine, "
f"found {func} of type {func.__class__.__name__!r} instead" # noqa
)
# Links the cooldowns to the given function.
_cooldown._func = func
_trigger_cooldown._func = func
attached_cooldowns = getattr(func, "_cooldowns", [])
if _cooldown not in attached_cooldowns:
attached_cooldowns.append(_cooldown)
if _trigger_cooldown not in attached_cooldowns:
attached_cooldowns.append(_trigger_cooldown)
setattr(func, "_cooldowns", attached_cooldowns)
@functools.wraps(func)
async def inner(*args, **kwargs):
use_cooldown = await maybe_coro(self.check, *args, **kwargs)
if not use_cooldown:
return await maybe_coro(func, *args, **kwargs)
self_arg = None
if "self" in kwargs:
self_arg = kwargs.pop("self")
# If the cooldown is triggered...
# if self.triggered:
# If still on triggered cooldown...
if _trigger_cooldown.remaining_calls(*args, **kwargs) < 1:
# Runs the Trigger Cooldown.
async with _trigger_cooldown(*args, **kwargs):
if self_arg:
kwargs["self"] = self_arg
result = await func(*args, **kwargs)
else:
result = await func(*args, **kwargs)
return result
# If the cooldown is not triggered.
# Runs the normal Cooldown.
async with _cooldown(*args, **kwargs):
if self_arg:
kwargs["self"] = self_arg
result = await func(*args, **kwargs)
else:
result = await func(*args, **kwargs)
return result
# Return the decorator.
return inner