You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
class AbstractRateLimiter(ABC):
@abstractmethod
async def acquire(self, *args, **kwargs):
pass
@abstractmethod
async def release(self):
pass
class RateLimiterB(AbstractRateLimiter):
def __init__(self, rpm: int = 0):
# from aiolimiter import AsyncLimiter
self.limiter = AsyncLimiter(rpm) if rpm > 0 else None
async def acquire(self, *args, **kwargs):
if self.limiter:
await self.limiter.acquire()
async def release(self):
pass
class RateLimiterA(AbstractRateLimiter):
def __init__(self, concurrency_limit: int = 0):
self._semaphore = asyncio.Semaphore(concurrency_limit) if concurrency_limit > 0 else None
async def acquire(self, *args, **kwargs):
if self._semaphore:
await self._semaphore.acquire()
async def release(self):
if self._semaphore:
self._semaphore.release()
and the following code that uses these limiters:
async def execute_async(self, body: dict) -> ChatAdapterResponse:
Returns:
ChatAdapterResponse: a ChatAdapterResponse from the model
"""
rate_limiter = RateLimterA / RateLimterB
if rate_limiter:
await rate_limiter.acquire()
try:
# async send, is wrapper around requests
response = await async_send_request(
"SomeUrl", body, self._get_headers()
)
return response
finally:
if rate_limiter:
await rate_limiter.release()
When the code was using unittest it works fine, and waits properly:
class TestRateLimiter():
def test_rate_limitA():
for i in range(self.TEST_COUNT):
start_time = asyncio.get_running_loop().time()
res = await execute_async(
SOME_DICT
)
end_time = asyncio.get_running_loop().time()
print(
f"Finished requests for {model} in {end_time - start_time} seconds"
)
# assert that from the second try onwards, rpm limit = 1 ensure that the request takes at least 60 seconds
if i > 0:
self.assertTrue(end_time - start_time > 60)
)
def test_rate_limiter_B():
start_time = asyncio.get_running_loop().time()
tasks = [
async_execute_on_model(
body
)
for body in bodies # array of 4 dicts
]
results = await asyncio.gather(*tasks)
end_time = asyncio.get_running_loop().time()
print(f"Finished requests in {end_time - start_time} seconds")
self.assertTrue(
end_time - start_time > 5 * (len(bodys) - 1)
)
However when I tried to switch to using pytest and pytest-async, it seems the async loop and waits are not longer waiting, and the following code executes very quickly and does not wait or respect the synchronization objects. Any idea why ?
async def test_rate_limit_A(mock_send_request, 1rpm_rate_limiterA):
for i in range(TEST_COUNT):
start_time = asyncio.get_running_loop().time()
res = await async_execute_on_model(
SomeDict
)
end_time = asyncio.get_running_loop().time()
print(
f"Finished requests for {dict} in {end_time - start_time} seconds"
)
# assert that from the second try onwards, rpm limit = 1 ensure that the request takes at least 60 seconds
if i > 0:
assert end_time - start_time > 60
async def test_rate_limitB(mock_async_requestB, 1rpm_rate_limiterB):
start_time = asyncio.get_running_loop().time()
tasks = [
async_execute_on_model(
body
)
for body in bodies
]
results = await asyncio.gather(*tasks)
end_time = asyncio.get_running_loop().time()
print(f"Finished requests in {end_time - start_time} seconds")
assert end_time - start_time > 5 * (len(bodies) - 1)
for res in results:
Why is this happening, and what am I doing wrong with pytest-async?
The text was updated successfully, but these errors were encountered:
It's hard to tell what's going on, when I don't have a runnable example. Do you think you could provide a minimal reproducer?
Unfortunately, pytest-asyncio currently comes with a couple of limitations. For one, fixtures and tests are run in different event loops. Depending on the contents of the 1rpm_rate_limiterA fixture, this could break the locking mechanism, as asyncio.Semaphore.acquire uses the running event loop to create a future.
I have the following code:
and the following code that uses these limiters:
When the code was using unittest it works fine, and waits properly:
However when I tried to switch to using pytest and pytest-async, it seems the async loop and waits are not longer waiting, and the following code executes very quickly and does not wait or respect the synchronization objects. Any idea why ?
Why is this happening, and what am I doing wrong with pytest-async?
The text was updated successfully, but these errors were encountered: