-
-
Notifications
You must be signed in to change notification settings - Fork 953
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TestClient.request does not honor stream=True #1102
Comments
@falkben How did you overcome this issue? |
Yes, I believe we ran into this issue as well. We found that async_asgi_testclient worked when we needed to consume a stream from a response. Though at the moment, I'm having trouble getting it to work with async generators. See: #1102 (comment) |
I think this is relevant now: encode/httpx#1491 |
Any update on this? I'm trying to test my sse endpoint by using the testclient to call the sse endpoint and then pass it to https://pypi.org/project/sseclient-py/ so I can parse the event data. But indeed, the testclient hangs. |
Version >= 0.19.0 of Starlette seems to not work with async-asgi-testclient stream. Version 0.18 and below I could test the streaming response. |
By "test the streaming response", you mean viewing the response with iter_content, correct? I got that to work with starlette 0.20.4. This post helped me do that. Thank you. The task I'm currently working on is taking the streaming response, and processing it into a SSE object (event, data, id, retry). https://pypi.org/project/sseclient-py/ attempts to do this, but the response returned by async_asgi_testclient.TestClient.get doesn't seem to work. I believe the starlette testclient might have worked, but it hangs (stream=true doesn't seem to work), I believe because of the issue described here. Any thoughts on this? |
You're correct, it does work with newer versions of starlette. I had a problem in one of my middleware's that was trying to consume the client (from scope). Also, my example above wasn't quite starlette code. here's a better example: from async_asgi_testclient import TestClient
from starlette.applications import Starlette
from starlette.responses import StreamingResponse
from typing import Generator
from typing import Literal
import pytest
app = Starlette(debug=True)
@app.route("/stream_y")
async def get_stream_y(request) -> StreamingResponse:
"""Stream "y" forever"""
# stream at most 1000 characters
max_lines = 1000
def gen_y() -> Generator[Literal["y"], None, None]:
i = 0
while i < max_lines:
yield "y"
i += 1
return StreamingResponse(gen_y())
@pytest.mark.asyncio
async def test_stream_y():
max_lines = 100
i = 0
async with TestClient(app) as client:
resp = await client.get("/stream_y", stream=True)
assert resp.status_code == 200
async for line in resp.iter_content(2):
if i > max_lines:
break
line = line.decode("utf-8").strip()
assert line == "y"
i += 1 As for the error in converting the SSE object into a response, that seems more like a |
You can use the follow reference to test SSE: https://github.com/florimondmanca/httpx-sse/blob/master/tests/test_asgi.py. |
The description mentions We need the following to be implemented on |
async-asgi-testclient streaming response doesn't seem to work with my current versions. It hangs on the request |
Your version is from February 2021... |
yup, had to downgrade fastapi/starlette because of: agronholm/anyio#374 :( currently trying to determine a method to testing the sse consume (async get request) with the current versions. |
Not sure how that issue is related here... In any case, you probably downgraded from a version that is not the latest one. A release from 2 years ago is just too old. You can check this comment: #1102 (comment) |
You're suggesting I try httpx-sse, as in that example, correct? @Kludex |
yes |
Sadly, no luck there. it's possible it's an unrelated issue. will continue to investigate and update here |
Did you find something? |
import asyncio
import pytest
import httpx
from httpx_sse import aconnect_sse
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse
async def numbers(minimum, maximum):
for i in range(minimum, maximum + 1):
await asyncio.sleep(0.9)
yield dict(data=i)
async def sse(request):
generator = numbers(1, 5)
return EventSourceResponse(generator)
routes = [Route("/", endpoint=sse)]
app = Starlette(debug=True, routes=routes)
@pytest.fixture()
def anyio_backend():
return "asyncio"
@pytest.mark.anyio
async def test_sse_response():
async with httpx.AsyncClient(app=app, base_url="http://test") as client:
async with aconnect_sse(client, "/") as event_source:
events = [sse async for sse in event_source.aiter_sse()]
data = [event.data for event in events]
assert data == ["1", "2", "3", "4", "5"] |
@Kludex , It was an unrelated issue i was having. where another part of my code was hanging. I just tried to use httpx-sse (now that I have fixed my other issue, and also since I noticed it was updated last night), but it doesn't seem to work exactly as I expect for my use case. In the example above, you are using a generator that stops after 5. In my use case, I have a generator that never stops. I wonder if you can change your example above to use a infinite generator, say where:
and t hen have your test confirm that the first 3 messages received are 0,1,2. This is the type of test I have currently implemented with the async_asgi_testclient. It looks something like this:
Not the cleanest, but it's working. |
I gave this a whirl tonight. |
Hello friends, last night I hunted down the issue to For those on the market for a functioning workaround, I used """
Demo of properly unit testing a starlette StreamingResponse.
httpx==0.25.2
pytest==7.4.3
starlette==0.27.0
uvicorn==0.24.0.post1
"""
import asyncio
import statistics
import time
from collections.abc import Iterator
from threading import Thread
import httpx
import pytest
from starlette.responses import StreamingResponse
from uvicorn import Config, Server
# SEE: https://www.starlette.io/responses/#streamingresponse
async def slow_numbers(minimum, maximum):
yield "<html><body><ul>"
for number in range(minimum, maximum + 1):
yield "<li>%d</li>" % number
await asyncio.sleep(0.5)
yield "</ul></body></html>"
async def app(scope, receive, send):
assert scope["type"] == "http"
response = StreamingResponse(slow_numbers(1, 5), media_type="text/html")
await response(scope, receive, send)
# SEE: https://github.com/encode/httpx/blob/0.25.2/tests/conftest.py#L230-L293
# Workaround for https://github.com/encode/starlette/issues/1102
class TestServer(Server):
__test__ = False
@property
def url(self) -> httpx.URL:
protocol = "https" if self.config.is_ssl else "http"
return httpx.URL(f"{protocol}://{self.config.host}:{self.config.port}/")
def install_signal_handlers(self) -> None:
# Disable the default installation of handlers for signals such as SIGTERM,
# because it can only be done in the main thread.
pass
async def serve(self, sockets=None):
self.restart_requested = asyncio.Event()
loop = asyncio.get_event_loop()
tasks = {
loop.create_task(super().serve(sockets=sockets)),
loop.create_task(self.watch_restarts()),
}
await asyncio.wait(tasks)
async def restart(self) -> None: # pragma: no cover
# This coroutine may be called from a different thread than the one the
# server is running on, and from an async environment that's not asyncio.
# For this reason, we use an event to coordinate with the server
# instead of calling shutdown()/startup() directly, and should not make
# any asyncio-specific operations.
self.started = False
self.restart_requested.set()
while not self.started:
await asyncio.sleep(0.2)
async def watch_restarts(self) -> None: # pragma: no cover
while True:
if self.should_exit:
return
try:
await asyncio.wait_for(self.restart_requested.wait(), timeout=0.1)
except asyncio.TimeoutError:
continue
self.restart_requested.clear()
await self.shutdown()
await self.startup()
def serve_in_thread(server: TestServer) -> Iterator[TestServer]:
thread = Thread(target=server.run)
thread.start()
try:
while not server.started:
time.sleep(1e-3)
yield server
finally:
server.should_exit = True
thread.join()
@pytest.fixture(name="server", scope="session")
def fixture_server() -> Iterator[TestServer]:
config = Config(app=app, lifespan="off", loop="asyncio")
server = TestServer(config=config)
yield from serve_in_thread(server)
# The actual test
def test_streaming(server: TestServer) -> None:
client = httpx.Client(base_url=server.url)
with client.stream("GET", "/") as response:
response: httpx.Response
texts, times = [], []
tic = time.perf_counter()
for value in response.iter_bytes():
texts.append(value.decode())
times.append((toc := time.perf_counter()) - tic)
tic = toc
assert len(times) > 1, "Should be more than one chunk"
assert times[0] < 0.6, "Perhaps you streamed everything in first chunk"
assert statistics.mean(times) < 0.6, "Should be streaming"
assert all([bool(text) for text in texts]), "Some text was empty" |
Checklist
master
.Describe the bug
The
requests.request
interface exposes anstream=True
option which results on the call not waiting for the entire body to arrive.stream=True
is not handled properly bystarlette.testclient._ASGIAdapter.send
, as it is unconditionally waiting for the entire request to finish:starlette/starlette/testclient.py
Line 240 in e430706
To reproduce
asyncio.Event
wait()
after your ASGI application responds with your headers (http.response.start
) and first chunk (http.response.body
), this can be easily done inside the generator passed tostarlette.StreamingResponse
.asyncio.Event
set()
only after doing the request withstream=True
.Expected behavior
With
stream=True
, TestClient.request should return right after firsthttp.response.body
(asgiref defines server must senthttp.response.start
headers only after the firsthttp.response.body
event is generated).Streaming-related response methods (like
iter_content
) should be implemented via awaiting furtherhttp.response.body
untilmore_body
is missing orFalse
.Actual behavior
Deadlock.
Additional context
I suspect #533 could be related to this.
The text was updated successfully, but these errors were encountered: