-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix(cloudflare): Use TransformStream to keep track of streams #20452
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,8 @@ const MOCK_OPTIONS: CloudflareOptions = { | |
| dsn: 'https://[email protected]/1337', | ||
| }; | ||
|
|
||
| const NODE_MAJOR_VERSION = parseInt(process.versions.node.split('.')[0]!); | ||
|
|
||
| function addDelayedWaitUntil(context: ExecutionContext) { | ||
| context.waitUntil(new Promise<void>(resolve => setTimeout(() => resolve()))); | ||
| } | ||
|
|
@@ -44,7 +46,7 @@ describe('withSentry', () => { | |
| await wrapRequestHandler( | ||
| { options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, | ||
| () => new Response('test'), | ||
| ); | ||
| ).then(response => response.text()); | ||
|
|
||
| expect(waitUntilSpy).toHaveBeenCalledTimes(1); | ||
| expect(waitUntilSpy).toHaveBeenLastCalledWith(expect.any(Promise)); | ||
|
|
@@ -111,11 +113,8 @@ describe('withSentry', () => { | |
|
|
||
| await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => { | ||
| addDelayedWaitUntil(context); | ||
| const response = new Response('test'); | ||
| // Add Content-Length to skip probing | ||
| response.headers.set('content-length', '4'); | ||
| return response; | ||
| }); | ||
| return new Response('test'); | ||
| }).then(response => response.text()); | ||
| expect(waitUntil).toBeCalled(); | ||
| vi.advanceTimersToNextTimer().runAllTimers(); | ||
| await Promise.all(waits); | ||
|
|
@@ -336,7 +335,7 @@ describe('withSentry', () => { | |
| SentryCore.captureMessage('sentry-trace'); | ||
| return new Response('test'); | ||
| }, | ||
| ); | ||
| ).then(response => response.text()); | ||
|
|
||
| // Wait for async span end and transaction capture | ||
| await new Promise(resolve => setTimeout(resolve, 50)); | ||
|
|
@@ -389,10 +388,8 @@ describe('flushAndDispose', () => { | |
| const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true); | ||
|
|
||
| await wrapRequestHandler({ options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, () => { | ||
| const response = new Response('test'); | ||
| response.headers.set('content-length', '4'); | ||
| return response; | ||
| }); | ||
| return new Response('test'); | ||
| }).then(response => response.text()); | ||
|
|
||
| // Wait for all waitUntil promises to resolve | ||
| await Promise.all(waits); | ||
|
|
@@ -518,6 +515,171 @@ describe('flushAndDispose', () => { | |
| disposeSpy.mockRestore(); | ||
| }); | ||
|
|
||
| // Regression tests for https://github.com/getsentry/sentry-javascript/issues/20409 | ||
| // | ||
| // Pre-fix: streaming responses were observed via `body.tee()` + a long-running | ||
| // `waitUntil(streamMonitor)`. Cloudflare caps `waitUntil` at ~30s after the | ||
| // handler returns, so any stream taking longer than 30s to fully emit had the | ||
| // monitor cancelled before `span.end()` / `flushAndDispose()` ran — silently | ||
| // dropping the root `http.server` span. | ||
| // | ||
| // Post-fix: the body is piped through a passthrough `TransformStream`; the | ||
| // `flush` (normal completion) and `cancel` (client disconnect) callbacks fire | ||
| // while the response stream is still active (no waitUntil cap), so they can | ||
| // safely end the span and register `flushAndDispose` via a fresh `waitUntil` | ||
| // window. The contract guaranteed below: `waitUntil` is NOT called with any | ||
| // long-running stream-observation promise — only with `flushAndDispose`, and | ||
| // only after the response stream has finished (either by completion or cancel). | ||
| describe('regression #20409: streaming responses do not park stream observation in waitUntil', () => { | ||
| test('waitUntil is not called until streaming response is fully delivered', async () => { | ||
| const waits: Promise<unknown>[] = []; | ||
| const waitUntil = vi.fn((promise: Promise<unknown>) => waits.push(promise)); | ||
| const context = { waitUntil } as unknown as ExecutionContext; | ||
|
|
||
| const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true); | ||
| const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose'); | ||
|
|
||
| // Stream emits chunk1, then waits indefinitely until we open the gate | ||
| // before emitting chunk2 + closing. Models a long-running upstream | ||
| // (e.g. SSE / LLM streaming) whose body takes longer than the | ||
| // handler-return time to fully drain. | ||
| let releaseLastChunk!: () => void; | ||
| const lastChunkGate = new Promise<void>(resolve => { | ||
| releaseLastChunk = resolve; | ||
| }); | ||
|
|
||
| const stream = new ReadableStream({ | ||
| async start(controller) { | ||
| controller.enqueue(new TextEncoder().encode('chunk1')); | ||
| await lastChunkGate; | ||
| controller.enqueue(new TextEncoder().encode('chunk2')); | ||
| controller.close(); | ||
| }, | ||
| }); | ||
|
|
||
| const result = await wrapRequestHandler( | ||
| { options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, | ||
| () => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }), | ||
| ); | ||
|
|
||
| // Handler has returned, but the source stream has NOT closed yet. | ||
| // The pre-fix code would have already enqueued a long-running | ||
| // `waitUntil(streamMonitor)` task at this point. The post-fix code | ||
| // must not call waitUntil at all here. | ||
| expect(waitUntil).not.toHaveBeenCalled(); | ||
|
|
||
| // Drain the response — Cloudflare would do this when forwarding to the client. | ||
| const reader = result.body!.getReader(); | ||
| await reader.read(); // chunk1 | ||
| // Source still hasn't closed — still no waitUntil. | ||
| expect(waitUntil).not.toHaveBeenCalled(); | ||
|
|
||
| releaseLastChunk(); | ||
| await reader.read(); // chunk2 | ||
| await reader.read(); // done | ||
| reader.releaseLock(); | ||
|
|
||
| // Stream completed → TransformStream `flush` fired → span ended → | ||
| // `flushAndDispose(client)` queued via waitUntil exactly once. | ||
| await Promise.all(waits); | ||
| expect(waitUntil).toHaveBeenCalledTimes(1); | ||
| expect(waitUntil).toHaveBeenLastCalledWith(expect.any(Promise)); | ||
| expect(flushSpy).toHaveBeenCalled(); | ||
| expect(disposeSpy).toHaveBeenCalled(); | ||
|
|
||
| flushSpy.mockRestore(); | ||
| disposeSpy.mockRestore(); | ||
| }); | ||
|
|
||
| // Node 18's TransformStream does not invoke the transformer's `cancel` hook | ||
| // when the downstream consumer cancels (WHATWG spec addition landed in Node 20). | ||
| // Cloudflare Workers run modern V8 where this works, so we only skip the | ||
| // test under Node 18. | ||
| test.skipIf(NODE_MAJOR_VERSION < 20)( | ||
| 'waitUntil is called once and dispose runs when client cancels mid-stream', | ||
| async () => { | ||
| const waits: Promise<unknown>[] = []; | ||
| const waitUntil = vi.fn((promise: Promise<unknown>) => waits.push(promise)); | ||
| const context = { waitUntil } as unknown as ExecutionContext; | ||
|
|
||
| const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true); | ||
| const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose'); | ||
|
|
||
| // Stream emits one chunk and then never closes — models an upstream | ||
| // that keeps emitting indefinitely. We then cancel the response from | ||
| // the consumer side to model a client disconnect. | ||
| let sourceCancelled = false; | ||
| const stream = new ReadableStream({ | ||
| start(controller) { | ||
| controller.enqueue(new TextEncoder().encode('chunk1')); | ||
| // intentionally don't close | ||
| }, | ||
| cancel() { | ||
| sourceCancelled = true; | ||
| }, | ||
| }); | ||
|
|
||
| const result = await wrapRequestHandler( | ||
| { options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, | ||
| () => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }), | ||
| ); | ||
|
|
||
| // Handler returned, source still open — no waitUntil yet. | ||
| expect(waitUntil).not.toHaveBeenCalled(); | ||
|
|
||
| const reader = result.body!.getReader(); | ||
| await reader.read(); // chunk1 | ||
| await reader.cancel('client disconnected'); // simulates client disconnect | ||
| reader.releaseLock(); | ||
|
|
||
| // TransformStream `cancel` fired → span ended → flushAndDispose queued. | ||
| await Promise.all(waits); | ||
| expect(waitUntil).toHaveBeenCalledTimes(1); | ||
| expect(waitUntil).toHaveBeenLastCalledWith(expect.any(Promise)); | ||
| expect(flushSpy).toHaveBeenCalled(); | ||
| expect(disposeSpy).toHaveBeenCalled(); | ||
| // pipeThrough should also propagate the cancel upstream to the source. | ||
| expect(sourceCancelled).toBe(true); | ||
|
|
||
| flushSpy.mockRestore(); | ||
| disposeSpy.mockRestore(); | ||
| }, | ||
| ); | ||
|
|
||
| test('waitUntil is called exactly once even if the response is consumed multiple times', async () => { | ||
| // Sanity: no matter how the response is drained, the TransformStream's | ||
| // flush callback must only end the span (and queue flushAndDispose) once. | ||
| const waits: Promise<unknown>[] = []; | ||
| const waitUntil = vi.fn((promise: Promise<unknown>) => waits.push(promise)); | ||
| const context = { waitUntil } as unknown as ExecutionContext; | ||
|
|
||
| const flushSpy = vi.spyOn(SentryCore.Client.prototype, 'flush').mockResolvedValue(true); | ||
| const disposeSpy = vi.spyOn(CloudflareClient.prototype, 'dispose'); | ||
|
|
||
| const stream = new ReadableStream({ | ||
| start(controller) { | ||
| controller.enqueue(new TextEncoder().encode('a')); | ||
| controller.enqueue(new TextEncoder().encode('b')); | ||
| controller.close(); | ||
| }, | ||
| }); | ||
|
|
||
| const result = await wrapRequestHandler( | ||
| { options: MOCK_OPTIONS, request: new Request('https://example.com'), context }, | ||
| () => new Response(stream, { headers: { 'content-type': 'text/event-stream' } }), | ||
| ); | ||
|
|
||
| const text = await result.text(); | ||
| expect(text).toBe('ab'); | ||
|
|
||
| await Promise.all(waits); | ||
| expect(waitUntil).toHaveBeenCalledTimes(1); | ||
|
|
||
| flushSpy.mockRestore(); | ||
| disposeSpy.mockRestore(); | ||
| }); | ||
| }); | ||
|
|
||
| test('dispose is NOT called for protocol upgrade responses (status 101)', async () => { | ||
| const context = createMockExecutionContext(); | ||
| const waits: Promise<unknown>[] = []; | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.