Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 14 additions & 24 deletions packages/react-on-rails-pro/src/RSCRequestTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,10 @@ import {
RSCPayloadStreamInfo,
RSCPayloadCallback,
RailsContextWithServerComponentMetadata,
GenerateRSCPayloadFunction,
} from 'react-on-rails/types';
import { extractErrorMessage } from './utils.ts';

/**
* Global function provided by React on Rails Pro for generating RSC payloads.
*
* This function is injected into the global scope during server-side rendering
* by the RORP rendering request. It handles the actual generation of React Server
* Component payloads on the server side.
*
* @see https://github.com/shakacode/react_on_rails_pro/blob/master/lib/react_on_rails_pro/server_rendering_js_code.rb
*/
declare global {
function generateRSCPayload(
componentName: string,
props: unknown,
railsContext: RailsContextWithServerComponentMetadata,
): Promise<NodeJS.ReadableStream>;
}

/**
* RSC Request Tracker - manages RSC payload generation and tracking for a single request.
*
Expand All @@ -52,8 +36,14 @@ class RSCRequestTracker {

private railsContext: RailsContextWithServerComponentMetadata;

constructor(railsContext: RailsContextWithServerComponentMetadata) {
private generateRSCPayload?: GenerateRSCPayloadFunction;

constructor(
railsContext: RailsContextWithServerComponentMetadata,
generateRSCPayload?: GenerateRSCPayloadFunction,
) {
this.railsContext = railsContext;
this.generateRSCPayload = generateRSCPayload;
}

/**
Expand Down Expand Up @@ -120,17 +110,17 @@ class RSCRequestTracker {
* @throws Error if generateRSCPayload is not available or fails
*/
async getRSCPayloadStream(componentName: string, props: unknown): Promise<NodeJS.ReadableStream> {
// Validate that the global generateRSCPayload function is available
if (typeof generateRSCPayload !== 'function') {
// Validate that the generateRSCPayload function is available
if (!this.generateRSCPayload) {
throw new Error(
'generateRSCPayload is not defined. Please ensure that you are using at least version 4.0.0 of ' +
'React on Rails Pro and the Node renderer, and that ReactOnRailsPro.configuration.enable_rsc_support ' +
'is set to true.',
'generateRSCPayload function is not available. This could mean: ' +
'(1) ReactOnRailsPro.configuration.enable_rsc_support is not enabled, or ' +
'(2) You are using an incompatible version of React on Rails Pro (requires 4.0.0+).',
);
}

try {
const stream = await generateRSCPayload(componentName, props, this.railsContext);
const stream = await this.generateRSCPayload(componentName, props, this.railsContext);

// Tee stream to allow for multiple consumers:
// 1. stream1 - Used by React's runtime to perform server-side rendering
Expand Down
12 changes: 10 additions & 2 deletions packages/react-on-rails-pro/src/streamingUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,19 @@ export const streamServerRenderedComponent = <T, P extends RenderParams>(
renderStrategy: StreamRenderer<T, P>,
handleError: (options: ErrorOptions) => PipeableOrReadableStream,
): T => {
const { name: componentName, domNodeId, trace, props, railsContext, throwJsErrors } = options;
const {
name: componentName,
domNodeId,
trace,
props,
railsContext,
throwJsErrors,
generateRSCPayload,
} = options;

assertRailsContextWithServerComponentMetadata(railsContext);
const postSSRHookTracker = new PostSSRHookTracker();
const rscRequestTracker = new RSCRequestTracker(railsContext);
const rscRequestTracker = new RSCRequestTracker(railsContext, generateRSCPayload);
const streamingTrackers = {
postSSRHookTracker,
rscRequestTracker,
Expand Down
7 changes: 7 additions & 0 deletions packages/react-on-rails/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,18 @@ export interface RegisteredComponent {

export type ItemRegistrationCallback<T> = (component: T) => void;

export type GenerateRSCPayloadFunction = (
componentName: string,
props: unknown,
railsContext: RailsContextWithServerComponentMetadata,
) => Promise<NodeJS.ReadableStream>;

interface Params {
props?: Record<string, unknown>;
railsContext?: RailsContext;
domNodeId?: string;
trace?: boolean;
generateRSCPayload?: GenerateRSCPayloadFunction;
}

export interface RenderParams extends Params {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,11 @@ def generate_rsc_payload_js_function(render_options)
rscBundleHash: '#{ReactOnRailsPro::Utils.rsc_bundle_hash}',
}
const runOnOtherBundle = globalThis.runOnOtherBundle;
if (typeof generateRSCPayload !== 'function') {
globalThis.generateRSCPayload = function generateRSCPayload(componentName, props, railsContext) {
const { renderingRequest, rscBundleHash } = railsContext.serverSideRSCPayloadParameters;
const propsString = JSON.stringify(props);
const newRenderingRequest = renderingRequest.replace(/\\(\\s*\\)\\s*$/, `('${componentName}', ${propsString})`);
return runOnOtherBundle(rscBundleHash, newRenderingRequest);
}
const generateRSCPayload = function generateRSCPayload(componentName, props, railsContext) {
const { renderingRequest, rscBundleHash } = railsContext.serverSideRSCPayloadParameters;
const propsString = JSON.stringify(props);
const newRenderingRequest = renderingRequest.replace(/\\(\\s*\\)\\s*$/, `('${componentName}', ${propsString})`);
return runOnOtherBundle(rscBundleHash, newRenderingRequest);
}
JS
end
Expand Down Expand Up @@ -94,6 +92,7 @@ def render(props_string, rails_context, redux_stores, react_component_name, rend
railsContext: railsContext,
throwJsErrors: #{ReactOnRailsPro.configuration.throw_js_errors},
renderingReturnsPromises: #{ReactOnRailsPro.configuration.rendering_returns_promises},
generateRSCPayload: typeof generateRSCPayload !== 'undefined' ? generateRSCPayload : undefined,
});
})()
JS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import parser from 'node-html-parser';
// @ts-expect-error TODO: fix later
import { RSCPayloadChunk } from 'react-on-rails';
import buildApp from '../src/worker';
import config from './testingNodeRendererConfigs';
import { createTestConfig } from './testingNodeRendererConfigs';
import { makeRequest } from './httpRequestUtils';

const { config } = createTestConfig('concurrentHtmlStreaming');
const app = buildApp(config);
const redisUrl = process.env.REDIS_URL || 'redis://localhost:6379';
const redisClient = createClient({ url: redisUrl });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@
}

const runOnOtherBundle = globalThis.runOnOtherBundle;
if (typeof generateRSCPayload !== 'function') {
globalThis.generateRSCPayload = function generateRSCPayload(componentName, props, railsContext) {
const { renderingRequest, rscBundleHash } = railsContext.serverSideRSCPayloadParameters;
const propsString = JSON.stringify(props);
const newRenderingRequest = renderingRequest.replace(/\(\s*\)\s*$/, `('${componentName}', ${propsString})`);
return runOnOtherBundle(rscBundleHash, newRenderingRequest);
}
const generateRSCPayload = function generateRSCPayload(componentName, props, railsContext) {
const { renderingRequest, rscBundleHash } = railsContext.serverSideRSCPayloadParameters;
const propsString = JSON.stringify(props);
const newRenderingRequest = renderingRequest.replace(/\(\s*\)\s*$/, `('${componentName}', ${propsString})`);
return runOnOtherBundle(rscBundleHash, newRenderingRequest);
}

ReactOnRails.clearHydratedStores();
Expand All @@ -35,5 +33,6 @@
railsContext: railsContext,
throwJsErrors: false,
renderingReturnsPromises: true,
generateRSCPayload: typeof generateRSCPayload !== 'undefined' ? generateRSCPayload : undefined,
});
})()
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import http2 from 'http2';
import buildApp from '../src/worker';
import config from './testingNodeRendererConfigs';
import { createTestConfig } from './testingNodeRendererConfigs';
import * as errorReporter from '../src/shared/errorReporter';
import { createForm, SERVER_BUNDLE_TIMESTAMP } from './httpRequestUtils';

const { config } = createTestConfig('htmlStreaming');
const app = buildApp(config);

beforeAll(async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ export const getNextChunkInternal = (

stream.once('data', onData);
stream.once('error', onError);
if (stream.closed) {
if ('closed' in stream && stream.closed) {
onClose();
} else {
stream.once('close', onClose);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import http2 from 'http2';
import * as fs from 'fs';
import buildApp from '../src/worker';
import config, { BUNDLE_PATH } from './testingNodeRendererConfigs';
import { createTestConfig } from './testingNodeRendererConfigs';
import * as errorReporter from '../src/shared/errorReporter';
import {
createRenderingRequest,
Expand All @@ -13,12 +12,10 @@ import {
} from './httpRequestUtils';
import packageJson from '../src/shared/packageJson';

const { config } = createTestConfig('incrementalHtmlStreaming');
const app = buildApp(config);

beforeAll(async () => {
if (fs.existsSync(BUNDLE_PATH)) {
fs.rmSync(BUNDLE_PATH, { recursive: true, force: true });
}
await app.ready();
await app.listen({ port: 0 });
});
Expand Down Expand Up @@ -161,8 +158,7 @@ it('incremental render html', async () => {
close();
});

// TODO: fix the problem of having a global shared `runOnOtherBundle` function
it.skip('raises an error if a specific async prop is not sent', async () => {
it('raises an error if a specific async prop is not sent', async () => {
const { status, body } = await makeRequest();
expect(body).toBe('');
expect(status).toBe(200);
Expand All @@ -182,3 +178,56 @@ it.skip('raises an error if a specific async prop is not sent', async () => {
await expect(getNextChunk(request)).rejects.toThrow('Stream Closed');
close();
});

describe('concurrent incremental HTML streaming', () => {
it('handles multiple parallel requests without race conditions', async () => {
await makeRequest();

const numRequests = 5;
const requests = [];

// Start all requests
for (let i = 0; i < numRequests; i += 1) {
const { request, close } = createHttpRequest(RSC_BUNDLE_TIMESTAMP, `concurrent-test-${i}`);
request.write(`${JSON.stringify(createInitialObject())}\n`);
requests.push({ request, close, id: i });
}

// Wait for all to connect and get initial chunks
await Promise.all(requests.map(({ request }) => waitForStatus(request)));
await Promise.all(requests.map(({ request }) => getNextChunk(request)));

// Send update chunks to ALL requests before waiting for any responses
// If sequential: second request wouldn't process until first completes
// If concurrent: all process simultaneously
requests.forEach(({ request, id }) => {
request.write(
`${JSON.stringify({
bundleTimestamp: RSC_BUNDLE_TIMESTAMP,
updateChunk: `
(function(){
var asyncPropsManager = sharedExecutionContext.get("asyncPropsManager");
asyncPropsManager.setProp("books", ["Request-${id}-Book"]);
asyncPropsManager.setProp("researches", ["Request-${id}-Research"]);
})()
`,
})}\n`,
);
request.end();
});

// Now wait for all responses - they should all succeed
const results = await Promise.all(
requests.map(async ({ request, close, id }) => {
const chunk = await getNextChunk(request);
close();
return { id, chunk };
}),
);

results.forEach(({ id, chunk }) => {
expect(chunk).toContain(`Request-${id}-Book`);
expect(chunk).toContain(`Request-${id}-Research`);
});
});
});
Loading
Loading