Skip to content

Conversation

@JHolcman-T
Copy link

resolves #1045

Implements the fix proposed in #1045. Enables handling of aborted (endless) streams e.g. proxied SSE streams.

Note:
This is my first contribution, feel free to suggest improvements of any kind 😊

@pi0 pi0 added the v1 label May 5, 2025
@pi0
Copy link
Member

pi0 commented May 5, 2025

Thank you for PR, looks like a nice fix.

Would you be able to add one unit test?

@codecov
Copy link

codecov bot commented May 5, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

📢 Thoughts on this report? Let us know!

@JHolcman-T
Copy link
Author

Thank you for PR, looks like a nice fix.

Would you be able to add one unit test?

Sure, where would you expect this unittest to be located - I mean in which *.test.ts file?

@JHolcman-T
Copy link
Author

Hi @pi0

I came up with this test case. The only thing is I am not sure where to put it. It seems to me that no test file really fits :/ maybe utils.test.ts ?

    it("can abort endless stream request", async () => {
      let connectionClosed = false;
      let streamCancelled = false;

      app.use(
        "/",
        eventHandler(async (event) => {
          event.node.res.setHeader(
            "Content-Type",
            "text/event-stream; charset=utf-8",
          );
          event.node.res.setHeader("Cache-Control", "no-cache");
          event.node.res.setHeader("Connection", "keep-alive");
          event.node.res.setHeader("X-Accel-Buffering", "no");

          // Detect when client disconnects
          event.node.res.on("close", () => {
            connectionClosed = true;
          });

          // clean up the stream
          let intervalId;
          const encoder = new TextEncoder();

          const stream = new ReadableStream({
            start(controller) {
              intervalId = setInterval(() => {
                controller.enqueue(encoder.encode("data: ping...\n\n"));
              }, 10);
            },
            // this is called when the stream has been cancelled trouhgh the AbortController signal in the sendStream function
            // commenting out the part of abort signal in the sendStream function => this will not be called after the request is cancelled
            cancel() {
              streamCancelled = true;
              // clean up the stream
              clearInterval(intervalId);
            },
          });

          return sendStream(event, stream);
        }),
      );

      const res = await fetch(url + "/", {
        method: "GET",
        headers: {
          Accept: "text/event-stream",
        },
      });

      if (!res.body) {
        throw new Error("No response body");
      } else {
        let messagesCounter = 0;
        const abort = new AbortController();
        res.body
          .pipeTo(
            new WritableStream({
              write(chunk) {
                messagesCounter += 1;
                if (messagesCounter > 5) {
                  // abort reading of the stream after few chunks
                  // simulate client disconnect
                  abort.abort();
                }
              },
            }),
            { signal: abort.signal },
          )
          // hanlde the error that is thrown when the stream is aborted
          .catch((err) => {});
      }
      await new Promise((resolve) => {
        setTimeout(() => {
          resolve(true);
        }, 500);
      });
      expect(res.status).toEqual(200);
      expect(connectionClosed !== false).toBe(true);
      expect(streamCancelled !== false).toBe(true);
    });

@pi0 pi0 changed the title fix(sendStream): handle aborted (endless) streams correctly fix(sendStream): handle stream abortion Jun 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants