-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[TRTLLM-8988][feat] Unify MPI & Ray's req/response handling with RPC Client/Server #8765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughThis pull request introduces RPC-based execution as an alternative transport mechanism for Ray executors and GPU workers. When the Changes
Sequence DiagramssequenceDiagram
participant Client
participant RayExecutor
participant RPCClient
participant RayGPUWorker
participant RPCServer
rect rgba(100, 150, 200, 0.2)
note over Client,RPCServer: RPC Mode (TLLM_RAY_USE_RPC=1)
end
Client->>RayExecutor: submit(request)
RayExecutor->>RayExecutor: Check use_rpc flag
RayExecutor->>RPCClient: submit(request).remote(need_response=False)
RPCClient->>RPCServer: RPC call
RPCServer->>RayGPUWorker: Process request
par Async Response Loop
RayExecutor->>RayExecutor: _fetch_responses_loop_async()
RayExecutor->>RPCClient: fetch_responses()
RPCClient->>RPCServer: Retrieve responses
RPCServer-->>RayGPUWorker: Collect from _response_queue
RayGPUWorker-->>RPCClient: Return responses
RPCClient-->>RayExecutor: Responses
RayExecutor->>RayExecutor: handle_responses() + _results[request.id]
end
RayExecutor-->>Client: GenerationResult
sequenceDiagram
participant Client
participant RayExecutor
participant RayAsyncQueue/RaySyncQueue
participant RayGPUWorker
rect rgba(200, 150, 100, 0.2)
note over Client,RayGPUWorker: Queue Mode (TLLM_RAY_USE_RPC not set)
end
Client->>RayExecutor: submit(request)
RayExecutor->>RayExecutor: Check use_rpc flag
RayExecutor->>RayAsyncQueue/RaySyncQueue: Queue request
RayAsyncQueue/RaySyncQueue->>RayGPUWorker: Receive from queue
RayGPUWorker->>RayGPUWorker: Process + setup_engine()
RayGPUWorker-->>RayAsyncQueue/RaySyncQueue: Response via queue
RayAsyncQueue/RaySyncQueue-->>Client: Result
note over RayExecutor: use_ray_queue returns True
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Areas requiring careful attention:
Pre-merge checks and finishing touches❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tensorrt_llm/executor/ray_executor.py (1)
357-408: Fix static analysis issues in shutdown method.The shutdown method has a few issues flagged by static analysis:
- Line 362: f-string without placeholders
- Lines 371, 380: Catching broad
ExceptionApply this diff to address the issues:
- logger_debug(f"Shutting down RayExecutor (RPC mode)", + logger_debug("Shutting down RayExecutor (RPC mode)", color="yellow") if hasattr(self, 'rpc_client') and self.rpc_client is not None: try: logger_debug("Shutting down RPC remote", color="yellow") self.call_all_ray_workers("shutdown", leader_only=False, async_call=False) - except Exception as e: + except (ray.exceptions.RayError, RuntimeError) as e: logger.warning(f"Error shutting down RPC remote: {e}") if hasattr(self, 'main_loop') and self.main_loop and hasattr( self, 'main_loop_task_obj') and self.main_loop_task_obj: logger_debug("Cancelling main loop task.", color="yellow") try: self.main_loop.call_soon_threadsafe( self.main_loop_task_obj.cancel) - except Exception as e: + except RuntimeError as e: logger_debug(f"Error cancelling main loop task: {e}", color="yellow")
🧹 Nitpick comments (6)
tensorrt_llm/executor/ray_gpu_worker.py (3)
223-223: Replace print statements with logger calls.The code uses
print()statements instead of the logging framework. This is inconsistent with the rest of the codebase and makes it harder to control log verbosity.Apply this diff to use proper logging:
- print(f"RayGPUWorker {mpi_rank()} setup_engine done") + logger.debug(f"RayGPUWorker {mpi_rank()} setup_engine done") def submit(self, request: GenerationRequest): - print( - f"[RPC] RayGPUWorker {mpi_rank()} submitting request {request.id}") + logger.debug(f"[RPC] RayGPUWorker {mpi_rank()} submitting request {request.id}") return super().submit(request)Also applies to: 243-245
247-289: Consider refactoring duplicated code.The TODO comments (lines 249, 273) indicate this code is copied from
RpcWorker. Consider extracting the shared logic into a base class or mixin to reduce duplication and improve maintainability.Would you like me to propose a refactoring that extracts the common async response handling logic into a reusable component?
197-199: Consider using a custom exception class.As per coding guidelines hint, avoid specifying long messages directly in the exception constructor. Consider defining a custom exception class or shortening the message.
- raise RuntimeError( - "RPC mode enabled but no rpc_addr provided to RayGPUWorker") + raise ValueError("rpc_addr is required when RPC mode is enabled")tensorrt_llm/executor/ray_executor.py (3)
92-92: Replace print statement with logger call.Use the logging framework instead of
print()for consistency with the rest of the codebase.- print(f"RPC client created at {self.rpc_addr}") + logger.info(f"RPC client created at {self.rpc_addr}")
144-228: Consider refactoring duplicated code.The TODO comments (lines 146, 160, 167, 193) indicate this code is copied from
GenerationExecutorRpcProxy. Consider extracting the shared logic to reduce duplication and improve maintainability.Would you like me to propose a refactoring that creates a shared base class or mixin for the RPC response handling logic?
314-342: LGTM with minor suggestion.The RPC-aware submit logic correctly routes requests through the RPC client or Ray queues based on the mode. The implementation is sound.
Consider extracting the RPC and Ray queue submission paths into separate helper methods for improved readability:
def submit(self, request: GenerationRequest) -> GenerationResult: request.set_id(self._get_next_client_id()) logprob_params = self._get_logprob_params(request) if self.use_rpc: return self._submit_via_rpc(request, logprob_params) else: return self._submit_via_ray_queue(request, logprob_params)
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
tensorrt_llm/_utils.py(1 hunks)tensorrt_llm/executor/base_worker.py(1 hunks)tensorrt_llm/executor/ray_executor.py(6 hunks)tensorrt_llm/executor/ray_gpu_worker.py(5 hunks)tensorrt_llm/executor/result.py(4 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{h,hpp,hh,hxx,cpp,cxx,cc,cu,cuh,py}
📄 CodeRabbit inference engine (CODING_GUIDELINES.md)
Use only spaces, no tabs; indent with 4 spaces.
Files:
tensorrt_llm/_utils.pytensorrt_llm/executor/result.pytensorrt_llm/executor/base_worker.pytensorrt_llm/executor/ray_executor.pytensorrt_llm/executor/ray_gpu_worker.py
**/*.py
📄 CodeRabbit inference engine (CODING_GUIDELINES.md)
**/*.py: Python code must target Python 3.8+.
Indent Python code with 4 spaces; do not use tabs.
Maintain module namespace when importing; prefer 'from package.subpackage import foo' then 'foo.SomeClass()' instead of importing the class directly.
Python filenames should be snake_case (e.g., some_file.py).
Python classes use PascalCase names.
Functions and methods use snake_case names.
Local variables use snake_case; prefix 'k' for variables that start with a number (e.g., k_99th_percentile).
Global variables use upper SNAKE_CASE prefixed with 'G' (e.g., G_MY_GLOBAL).
Constants use upper SNAKE_CASE (e.g., MY_CONSTANT).
Avoid shadowing variables from an outer scope.
Initialize all externally visible members of a class in the constructor.
Prefer docstrings for interfaces that may be used outside a file; comments for in-function or file-local interfaces.
Use Google-style docstrings for classes and functions (Sphinx-parsable).
Document attributes and variables inline so they render under the class/function docstring.
Avoid reflection when a simpler, explicit approach suffices (e.g., avoid dict(**locals()) patterns).
In try/except, catch the most specific exceptions possible.
For duck-typing try/except, keep the try body minimal and use else for the main logic.
Files:
tensorrt_llm/_utils.pytensorrt_llm/executor/result.pytensorrt_llm/executor/base_worker.pytensorrt_llm/executor/ray_executor.pytensorrt_llm/executor/ray_gpu_worker.py
**/*.{cpp,cxx,cc,h,hpp,hh,hxx,cu,cuh,py}
📄 CodeRabbit inference engine (CODING_GUIDELINES.md)
Prepend the NVIDIA Apache-2.0 copyright header with current year to the top of all source files (e.g., .cpp, .h, .cu, .py).
Files:
tensorrt_llm/_utils.pytensorrt_llm/executor/result.pytensorrt_llm/executor/base_worker.pytensorrt_llm/executor/ray_executor.pytensorrt_llm/executor/ray_gpu_worker.py
🧠 Learnings (2)
📚 Learning: 2025-07-17T09:01:27.402Z
Learnt from: amitz-nv
PR: NVIDIA/TensorRT-LLM#5616
File: tensorrt_llm/executor/worker.py:375-384
Timestamp: 2025-07-17T09:01:27.402Z
Learning: In tensorrt_llm/executor/worker.py, the LoRA adapter cache optimization logic that checks `is_adapter_in_cpu_cache()` and conditionally passes None for weights/config has a known race condition issue that cannot be solved with simple error handling or verification checks. This is a known limitation that requires a more comprehensive solution.
Applied to files:
tensorrt_llm/executor/base_worker.py
📚 Learning: 2025-09-24T03:31:28.908Z
Learnt from: tongyuantongyu
PR: NVIDIA/TensorRT-LLM#7520
File: tensorrt_llm/_torch/pyexecutor/resource_manager.py:605-613
Timestamp: 2025-09-24T03:31:28.908Z
Learning: In TensorRT-LLM Ray orchestrator mode, ProcessGroups are initialized with both Gloo and NCCL backends (e.g., "cuda:nccl,cpu:gloo"), allowing PyTorch distributed to automatically route CPU tensors through Gloo and GPU tensors through NCCL. This eliminates the need for manual device placement when performing allreduce operations on base types.
Applied to files:
tensorrt_llm/executor/ray_executor.pytensorrt_llm/executor/ray_gpu_worker.py
🧬 Code graph analysis (3)
tensorrt_llm/executor/result.py (1)
tensorrt_llm/_utils.py (1)
mpi_disabled(522-524)
tensorrt_llm/executor/ray_executor.py (6)
tensorrt_llm/_utils.py (2)
get_free_port(476-479)ray_use_rpc(528-529)tensorrt_llm/llmapi/utils.py (3)
_SyncQueue(446-513)logger_debug(103-116)notify_many(479-487)tensorrt_llm/executor/ray_gpu_worker.py (4)
RayGPUWorker(154-327)shutdown(290-321)submit(75-76)submit(242-245)tensorrt_llm/executor/rpc/rpc_client.py (2)
RPCClient(71-497)remote_streaming(64-68)tensorrt_llm/executor/rpc/rpc_common.py (1)
get_unique_ipc_addr(9-16)tensorrt_llm/executor/utils.py (1)
is_llm_response(149-155)
tensorrt_llm/executor/ray_gpu_worker.py (5)
tensorrt_llm/_utils.py (2)
mpi_rank(532-539)ray_use_rpc(528-529)tensorrt_llm/llmapi/utils.py (1)
logger_debug(103-116)tensorrt_llm/executor/base_worker.py (8)
BaseWorker(44-606)set_result_queue(236-239)start(286-288)setup_engine(104-215)submit(526-560)await_responses(217-219)responses_handler(625-650)shutdown(562-570)tensorrt_llm/executor/rpc/rpc_server.py (2)
RPCServer(17-514)bind(82-94)tensorrt_llm/executor/rpc_worker.py (7)
RpcWorker(23-244)start(184-185)setup_engine(169-175)submit(70-72)fetch_responses_async(90-99)fetch_responses_loop_async(110-123)shutdown(177-182)
🪛 Ruff (0.14.2)
tensorrt_llm/executor/ray_executor.py
362-362: f-string without any placeholders
Remove extraneous f prefix
(F541)
371-371: Do not catch blind exception: Exception
(BLE001)
380-380: Do not catch blind exception: Exception
(BLE001)
tensorrt_llm/executor/ray_gpu_worker.py
198-199: Avoid specifying long messages outside the exception class
(TRY003)
🔇 Additional comments (10)
tensorrt_llm/_utils.py (1)
527-530: LGTM!The
ray_use_rpc()helper function follows the established pattern ofmpi_disabled()and provides a clean API for checking RPC mode. The implementation is correct and the TODO comment appropriately flags this as a tentative feature flag.tensorrt_llm/executor/result.py (4)
276-276: LGTM!The
_use_ray_queueflag cleanly distinguishes Ray-based result handling from non-Ray paths, supporting the dual RPC/Ray mode introduced in this PR. The initialization logic is correct.
553-557: LGTM!The conditional unregistration logic correctly gates Ray-specific cleanup based on the
_use_ray_queueflag, ensuring Ray queue cleanup only occurs when Ray queues are actually in use.
851-856: LGTM!The synchronous result path correctly gates Ray queue usage with
_use_ray_queue, falling back to standard queue operations when Ray queues are not in use.
862-866: LGTM!The asynchronous result path correctly gates Ray queue usage with
_use_ray_queue, maintaining consistency with the synchronous path.tensorrt_llm/executor/ray_executor.py (3)
87-114: LGTM!The conditional initialization correctly separates RPC mode from Ray queue mode, initializing the appropriate infrastructure based on the
use_rpcflag. The logic is clear and maintains the existing Ray queue behavior when RPC is disabled.
141-143: LGTM!The
use_ray_queue()method correctly returns the inverse ofuse_rpc, ensuring that Ray queue usage is disabled when RPC mode is active.
166-187: Verify thread safety of atexit registration.The
atexit.register(self.shutdown)at line 187 registers the shutdown method, but the main loop thread is started as a daemon. If the main thread exits before the daemon thread completes, there could be resource cleanup issues.Consider whether the atexit handler will reliably clean up the RPC client and main loop thread, especially in error scenarios. You may want to ensure the shutdown sequence is deterministic:
# In setup_mainloop, before starting the thread: atexit.register(self.shutdown) # And ensure shutdown waits for the thread: if hasattr(self, 'main_loop_thread') and self.main_loop_thread.is_alive(): self.main_loop_thread.join(timeout=5.0)tensorrt_llm/executor/ray_gpu_worker.py (1)
193-216: Based on the script output provided, I can now verify the original review comment. Here's what the data shows:From the first execution results:
- RPC address generation and propagation: The
ray_executor.pycreates a unique IPC address (self.rpc_addr = get_unique_ipc_addr()) and passes it to all workers viaworker_kwargs["rpc_addr"](line 121).- Server created on rank 0: The snippet in
ray_gpu_worker.pyshows the RPC server is created on rank 0 only (lines 206-211).- Address passed to all workers: The test data confirms
rpc_addris passed to bothRpcWorker.main_taskand ray workers.The code flow shows the address is properly distributed to all workers before they initialize. The
torch.distributed.all_gather_object()calls in the codebase (lines 232-233 of the snippet) indicate there is synchronization infrastructure in place. Since all Ray workers are created with the samerpc_addrparameter before the RPC server starts, and the server binding happens on rank 0 immediately after receiving this parameter, the IPC socket should be accessible by all workers that reside on the same machine (as IPC addresses are local filesystem paths).The review comment's concern about accessibility is theoretical rather than pointing to an actual bug, as the architecture shows proper address propagation without explicit synchronization issues present in this snippet.
No new critical issues found after verification. The RPC address is correctly propagated to all workers before server initialization.
tensorrt_llm/executor/base_worker.py (1)
528-530: Review comment is incorrect. The commentedself.start()call is not the proper initialization mechanism.The code is correctly designed:
RayGPUWorker initializes properly:
setup_engine()is called in__init__for non-RPC mode (line 216) and remotely for RPC mode.
start()is not an initialization method:BaseWorker.start()raisesNotImplementedError, and calling it insubmit()would fail repeatedly on every request, not provide initialization checking.The underlying concern is valid but the proposed solution is wrong: The TODO acknowledges poor error propagation, but calling an abstract method in
submit()is not the fix. Proper initialization happens in__init__wheresetup_engine()is already invoked.The commented code should remain commented. If error propagation needs improvement, the fix belongs elsewhere (e.g., explicit initialization validation in
__init__or before firstsubmit()call), not by callingstart()on every submission.Likely an incorrect or invalid review comment.
a867f9e to
06353fb
Compare
|
Waiting for #8415 to merge for refactoring. |
f48aa54 to
848755f
Compare
|
/bot run --stage-list " DGX_H100-2_GPUs-PyTorch-Ray-1, H100_PCIe-PyTorch-Ray-1" |
|
PR_Github #23551 [ run ] triggered by Bot. Commit: |
|
PR_Github #23551 [ run ] completed with state |
|
/bot run --stage-list " DGX_H100-2_GPUs-PyTorch-Ray-1, H100_PCIe-PyTorch-Ray-1" |
|
/bot run --stage-list " DGX_H100-2_GPUs-PyTorch-Ray-1, H100_PCIe-PyTorch-Ray-1" |
|
PR_Github #23779 [ run ] triggered by Bot. Commit: |
|
PR_Github #23779 [ run ] completed with state |
bf6a75b to
d884d2d
Compare
|
PR_Github #24094 [ run ] completed with state |
|
PR_Github #24097 [ kill ] completed with state |
|
/bot run |
|
PR_Github #24099 [ run ] triggered by Bot. Commit: |
|
PR_Github #24099 [ run ] completed with state |
Superjomn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
af8c47d to
825005b
Compare
|
/bot run |
|
PR_Github #24146 [ run ] triggered by Bot. Commit: |
|
PR_Github #24146 [ run ] completed with state |
|
/bot run |
|
PR_Github #24168 [ run ] triggered by Bot. Commit: |
|
PR_Github #24168 [ run ] completed with state |
|
/bot run |
|
PR_Github #24213 [ run ] triggered by Bot. Commit: |
|
PR_Github #24213 [ run ] completed with state |
|
/bot run |
|
PR_Github #24219 [ run ] triggered by Bot. Commit: |
Signed-off-by: Erin Ho <[email protected]>
Signed-off-by: Erin Ho <[email protected]>
Signed-off-by: Erin Ho <[email protected]>
Signed-off-by: Erin Ho <[email protected]> precheck
Signed-off-by: Erin Ho <[email protected]>
Summary by CodeRabbit
New Features
TLLM_RAY_USE_RPC=1environment variable for enhanced execution flexibility.Improvements
Description
Test Coverage
PR Checklist
Please review the following before submitting your PR:
PR description clearly explains what and why. If using CodeRabbit's summary, please make sure it makes sense.
PR Follows TRT-LLM CODING GUIDELINES to the best of your knowledge.
Test cases are provided for new code paths (see test instructions)
Any new dependencies have been scanned for license and vulnerabilities
CODEOWNERS updated if ownership changes
Documentation updated as needed
The reviewers assigned automatically/manually are appropriate for the PR.
Please check this after reviewing the above items as appropriate for this PR.
GitHub Bot Help
/bot [-h] ['run', 'kill', 'skip', 'reuse-pipeline'] ...Provide a user friendly way for developers to interact with a Jenkins server.
Run
/bot [-h|--help]to print this help message.See details below for each supported subcommand.
run [--reuse-test (optional)pipeline-id --disable-fail-fast --skip-test --stage-list "A10-PyTorch-1, xxx" --gpu-type "A30, H100_PCIe" --test-backend "pytorch, cpp" --add-multi-gpu-test --only-multi-gpu-test --disable-multi-gpu-test --post-merge --extra-stage "H100_PCIe-TensorRT-Post-Merge-1, xxx" --detailed-log --debug(experimental)]Launch build/test pipelines. All previously running jobs will be killed.
--reuse-test (optional)pipeline-id(OPTIONAL) : Allow the new pipeline to reuse build artifacts and skip successful test stages from a specified pipeline or the last pipeline if no pipeline-id is indicated. If the Git commit ID has changed, this option will be always ignored. The DEFAULT behavior of the bot is to reuse build artifacts and successful test results from the last pipeline.--disable-reuse-test(OPTIONAL) : Explicitly prevent the pipeline from reusing build artifacts and skipping successful test stages from a previous pipeline. Ensure that all builds and tests are run regardless of previous successes.--disable-fail-fast(OPTIONAL) : Disable fail fast on build/tests/infra failures.--skip-test(OPTIONAL) : Skip all test stages, but still run build stages, package stages and sanity check stages. Note: Does NOT update GitHub check status.--stage-list "A10-PyTorch-1, xxx"(OPTIONAL) : Only run the specified test stages. Examples: "A10-PyTorch-1, xxx". Note: Does NOT update GitHub check status.--gpu-type "A30, H100_PCIe"(OPTIONAL) : Only run the test stages on the specified GPU types. Examples: "A30, H100_PCIe". Note: Does NOT update GitHub check status.--test-backend "pytorch, cpp"(OPTIONAL) : Skip test stages which don't match the specified backends. Only support [pytorch, cpp, tensorrt, triton]. Examples: "pytorch, cpp" (does not run test stages with tensorrt or triton backend). Note: Does NOT update GitHub pipeline status.--only-multi-gpu-test(OPTIONAL) : Only run the multi-GPU tests. Note: Does NOT update GitHub check status.--disable-multi-gpu-test(OPTIONAL) : Disable the multi-GPU tests. Note: Does NOT update GitHub check status.--add-multi-gpu-test(OPTIONAL) : Force run the multi-GPU tests in addition to running L0 pre-merge pipeline.--post-merge(OPTIONAL) : Run the L0 post-merge pipeline instead of the ordinary L0 pre-merge pipeline.--extra-stage "H100_PCIe-TensorRT-Post-Merge-1, xxx"(OPTIONAL) : Run the ordinary L0 pre-merge pipeline and specified test stages. Examples: --extra-stage "H100_PCIe-TensorRT-Post-Merge-1, xxx".--detailed-log(OPTIONAL) : Enable flushing out all logs to the Jenkins console. This will significantly increase the log volume and may slow down the job.--debug(OPTIONAL) : Experimental feature. Enable access to the CI container for debugging purpose. Note: Specify exactly one stage in thestage-listparameter to access the appropriate container environment. Note: Does NOT update GitHub check status.For guidance on mapping tests to stage names, see
docs/source/reference/ci-overview.mdand the
scripts/test_to_stage_mapping.pyhelper.kill
killKill all running builds associated with pull request.
skip
skip --comment COMMENTSkip testing for latest commit on pull request.
--comment "Reason for skipping build/test"is required. IMPORTANT NOTE: This is dangerous since lack of user care and validation can cause top of tree to break.reuse-pipeline
reuse-pipelineReuse a previous pipeline to validate current commit. This action will also kill all currently running builds associated with the pull request. IMPORTANT NOTE: This is dangerous since lack of user care and validation can cause top of tree to break.