diff --git a/megatron/core/inference/text_generation_controllers/text_generation_controller.py b/megatron/core/inference/text_generation_controllers/text_generation_controller.py index 2bda1425710..6e00f58ac23 100644 --- a/megatron/core/inference/text_generation_controllers/text_generation_controller.py +++ b/megatron/core/inference/text_generation_controllers/text_generation_controller.py @@ -29,6 +29,7 @@ ) from megatron.core.inference.sampling_params import SamplingParams from megatron.core.inference.utils import get_attention_mask, set_decode_expert_padding +from megatron.core.transformer.enums import CudaGraphScope from megatron.core.transformer.moe.moe_layer import BaseMoELayer from megatron.core.transformer.utils import set_model_to_sequence_parallel from megatron.core.utils import get_asyncio_loop, get_model_config, unwrap_model @@ -851,7 +852,7 @@ def generate_all_output_tokens_static_batch( # Check whether CUDA graphs are enabled enable_cuda_graph = ( model_config.cuda_graph_impl == "local" - and "full_iteration" not in model_config.cuda_graph_scope + and CudaGraphScope.full_iteration not in model_config.cuda_graph_scope ) # Pad batch tokens if necessary diff --git a/megatron/core/models/common/language_module/language_module.py b/megatron/core/models/common/language_module/language_module.py index de2ecfb8011..259bb716a93 100644 --- a/megatron/core/models/common/language_module/language_module.py +++ b/megatron/core/models/common/language_module/language_module.py @@ -21,7 +21,7 @@ is_vp_last_stage, ) from megatron.core.process_groups_config import ProcessGroupCollection -from megatron.core.transformer.enums import AttnBackend +from megatron.core.transformer.enums import AttnBackend, CudaGraphScope from megatron.core.transformer.module import MegatronModule from megatron.core.transformer.transformer_config import TransformerConfig from megatron.core.transformer.utils import ensure_metadata_has_dp_cp_group @@ -144,8 +144,7 @@ def compute_language_model_loss(self, labels: Tensor, logits: Tensor) -> Tensor: # Use is_cg_capturable=True for full iteration CUDA graphs to avoid torch.equal checks is_cg_capturable = ( hasattr(self.config, 'cuda_graph_scope') - and self.config.cuda_graph_scope - and 'full_iteration' in self.config.cuda_graph_scope + and CudaGraphScope.full_iteration in self.config.cuda_graph_scope ) if is_cg_capturable and not is_te_min_version("2.7.0"): from megatron.core.utils import get_te_version diff --git a/megatron/core/models/gpt/gpt_model.py b/megatron/core/models/gpt/gpt_model.py index e840fca99b3..0e48eade8c7 100644 --- a/megatron/core/models/gpt/gpt_model.py +++ b/megatron/core/models/gpt/gpt_model.py @@ -24,7 +24,7 @@ from megatron.core.process_groups_config import ProcessGroupCollection from megatron.core.quantization.utils import get_quant_config_or_none from megatron.core.tensor_parallel import gather_from_sequence_parallel_region -from megatron.core.transformer.enums import ModelType +from megatron.core.transformer.enums import CudaGraphScope, ModelType from megatron.core.transformer.multi_token_prediction import ( MTPLossAutoScaler, MTPLossLoggingHelper, @@ -374,7 +374,7 @@ def _preprocess( and ( ( self.config.cuda_graph_impl == "local" - and "full_iteration" not in self.config.cuda_graph_scope + and CudaGraphScope.full_iteration not in self.config.cuda_graph_scope ) or self.config.flash_decode ) diff --git a/megatron/core/pipeline_parallel/schedules.py b/megatron/core/pipeline_parallel/schedules.py index d0b912349b4..18344429c45 100644 --- a/megatron/core/pipeline_parallel/schedules.py +++ b/megatron/core/pipeline_parallel/schedules.py @@ -21,6 +21,7 @@ ) from megatron.core.process_groups_config import ProcessGroupCollection from megatron.core.transformer.cuda_graphs import create_cudagraphs +from megatron.core.transformer.enums import CudaGraphScope from megatron.core.transformer.moe.router import MoEAuxLossAutoScaler from megatron.core.utils import ( drain_embedding_wgrad_compute, @@ -656,7 +657,7 @@ def forward_backward_no_pipelining( if ( hasattr(config, 'cuda_graph_impl') and config.cuda_graph_impl == "local" - and "full_iteration" not in config.cuda_graph_scope + and CudaGraphScope.full_iteration not in config.cuda_graph_scope ): create_cudagraphs() @@ -1923,7 +1924,7 @@ def pp_post_backward(input_tensor_grad, vp_stage=None): if ( hasattr(config, 'cuda_graph_impl') and config.cuda_graph_impl == "local" - and "full_iteration" not in config.cuda_graph_scope + and CudaGraphScope.full_iteration not in config.cuda_graph_scope ): create_cudagraphs() nvtx_range_pop(suffix="misc") @@ -2310,7 +2311,7 @@ def enable_grad_sync(): if ( hasattr(config, 'cuda_graph_impl') and config.cuda_graph_impl == "local" - and "full_iteration" not in config.cuda_graph_scope + and CudaGraphScope.full_iteration not in config.cuda_graph_scope ): create_cudagraphs() diff --git a/megatron/core/ssm/mamba_block.py b/megatron/core/ssm/mamba_block.py index 1bcadd0af10..3201a8bfb28 100644 --- a/megatron/core/ssm/mamba_block.py +++ b/megatron/core/ssm/mamba_block.py @@ -25,6 +25,7 @@ from megatron.core.ssm.mamba_hybrid_layer_allocation import allocate_layers from megatron.core.tensor_parallel import get_cuda_rng_tracker from megatron.core.transformer import TransformerConfig +from megatron.core.transformer.enums import CudaGraphScope from megatron.core.transformer.identity_op import IdentityOp from megatron.core.transformer.module import MegatronModule from megatron.core.transformer.spec_utils import ModuleSpec, build_module @@ -294,7 +295,7 @@ def forward( ( ( self.config.cuda_graph_impl == "local" - and "full_iteration" not in self.config.cuda_graph_scope + and CudaGraphScope.full_iteration not in self.config.cuda_graph_scope ) or self.config.flash_decode ) diff --git a/megatron/core/transformer/attention.py b/megatron/core/transformer/attention.py index 74031f38219..57ba494742b 100644 --- a/megatron/core/transformer/attention.py +++ b/megatron/core/transformer/attention.py @@ -45,7 +45,7 @@ from ..models.common.embeddings.yarn_rotary_pos_embedding import ( _yarn_get_concentration_factor_from_config, ) -from .enums import AttnMaskType +from .enums import AttnMaskType, CudaGraphScope from .transformer_config import TransformerConfig try: @@ -828,7 +828,7 @@ def forward( if ( in_decode_mode and self.config.cuda_graph_impl == "local" - and "full_iteration" not in self.config.cuda_graph_scope + and CudaGraphScope.full_iteration not in self.config.cuda_graph_scope and inference_context.is_static_batching() ): raise ValueError(f"CUDA graphs must use flash decode with static batching!") diff --git a/megatron/core/transformer/cuda_graphs.py b/megatron/core/transformer/cuda_graphs.py index 12f15ee980a..5b0a0333d9e 100644 --- a/megatron/core/transformer/cuda_graphs.py +++ b/megatron/core/transformer/cuda_graphs.py @@ -21,6 +21,7 @@ get_all_rng_states, get_cuda_rng_tracker, ) +from megatron.core.transformer.enums import CudaGraphScope from megatron.core.transformer.identity_op import IdentityOp from megatron.core.transformer.module import GraphableMegatronModule, MegatronModule from megatron.core.transformer.transformer_config import TransformerConfig @@ -1344,24 +1345,24 @@ def _layer_is_graphable(layer, config): from megatron.core.transformer.moe.moe_layer import MoELayer from megatron.core.transformer.transformer_layer import TransformerLayer - if isinstance(layer, MambaLayer) and 'mamba' in config.cuda_graph_scope: + if isinstance(layer, MambaLayer) and CudaGraphScope.mamba in config.cuda_graph_scope: # mamba layer. return True if isinstance(layer, TransformerLayer): - if 'attn' in config.cuda_graph_scope and not ( + if CudaGraphScope.attn in config.cuda_graph_scope and not ( isinstance(layer.self_attention, IdentityOp) and isinstance(layer.cross_attention, IdentityOp) ): # attn layer. return True if ( - 'moe' in config.cuda_graph_scope - or 'moe_router' in config.cuda_graph_scope - or 'moe_preprocess' in config.cuda_graph_scope + CudaGraphScope.moe in config.cuda_graph_scope + or CudaGraphScope.moe_router in config.cuda_graph_scope + or CudaGraphScope.moe_preprocess in config.cuda_graph_scope ) and isinstance(layer.mlp, MoELayer): # moe layer. return True - if 'mlp' in config.cuda_graph_scope and isinstance(layer.mlp, MLP): + if CudaGraphScope.mlp in config.cuda_graph_scope and isinstance(layer.mlp, MLP): # mlp layer. return True return False @@ -1388,7 +1389,7 @@ def __init__(self, model, config, seq_length, micro_batch_size, optimizers=[]): "Setting NCCL_GRAPH_REGISTER=0 to avoid illegal memory access when using " "CUDA Graph with PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True." ) - assert "full_iteration" not in config.cuda_graph_scope, ( + assert CudaGraphScope.full_iteration not in config.cuda_graph_scope, ( "full_iteration cuda graph is not supported for cuda_graph_impl=transformer_engine. " "Please use cuda_graph_impl=local instead." ) @@ -1529,7 +1530,7 @@ def get_rotary_pos_emb(transformer_module, transformer_input): and not isinstance(layer.self_attention, IdentityOp) and ( not self.config.cuda_graph_scope - or 'attn' in self.config.cuda_graph_scope + or CudaGraphScope.attn in self.config.cuda_graph_scope ) ) if is_te_min_version("1.10.0"): @@ -1712,3 +1713,33 @@ def cuda_graph_set_manual_hooks(self): model_chunk = self.model[chunk_number] for layer in layers: layer.setup_manual_hooks(model_chunk._make_forward_pre_hook) + + def delete_cuda_graphs(self): + """ + Delete all CUDA graphs. + """ + assert self._graphs_created, "CUDA Graphs have not been created." + + graph_resettable = is_te_min_version("2.10.0") + graphs_reset, graphs_not_reset = 0, 0 + for layers in self.callables_per_chunk: + for layer in layers: + for graph in layer.cuda_graphs: + if graph_resettable: + graph.reset() + graphs_reset += 1 + else: + graphs_not_reset += 1 + layer.cuda_graphs = [] + layer.cuda_graph_manual_hooks = [] + + log_on_each_pipeline_stage( + logger=logger, + tp_group=None, + dp_cp_group=None, + level=logging.INFO, + msg=f'Rank {torch.distributed.get_rank()}: ' + f'{graphs_reset} graphs deleted with explicit reset, ' + f'{graphs_not_reset} graphs deleted without explicit reset.', + ) + self._graphs_created = False diff --git a/megatron/core/transformer/enums.py b/megatron/core/transformer/enums.py index 52b82029f90..d06d58d65f2 100644 --- a/megatron/core/transformer/enums.py +++ b/megatron/core/transformer/enums.py @@ -65,3 +65,15 @@ class AttnBackend(enum.Enum): unfused = 3 local = 4 auto = 5 + + +class CudaGraphScope(enum.Enum): + """Cuda Graph Scope - defines which parts of the model to capture.""" + + full_iteration = 1 # Captures the entire training/inference iteration + attn = 2 # Captures attention layers + mlp = 3 # Captures MLP layers (dense layers only) + moe = 4 # Captures MoE layers (drop-and-pad MoE layers only) + moe_router = 5 # Captures MoE router part + moe_preprocess = 6 # Captures MoE preprocessing part (requires moe_router) + mamba = 7 # Captures Mamba layers diff --git a/megatron/core/transformer/moe/fused_a2a.py b/megatron/core/transformer/moe/fused_a2a.py index 60b0b11a32c..045a93039b3 100644 --- a/megatron/core/transformer/moe/fused_a2a.py +++ b/megatron/core/transformer/moe/fused_a2a.py @@ -320,6 +320,14 @@ def init_hybrid_ep_buffer( ) +def reset_hybrid_ep_buffer(): + ''' + Reset the HybridEP buffer + ''' + global _hybrid_ep_buffer + _hybrid_ep_buffer = None + + class HybridEPDispatch(torch.autograd.Function): ''' Fused dispatch operation for permute + dispatch a2a + permute using the HybridEP backend diff --git a/megatron/core/transformer/moe/moe_utils.py b/megatron/core/transformer/moe/moe_utils.py index d28cbfea3fe..3ed31d375e2 100644 --- a/megatron/core/transformer/moe/moe_utils.py +++ b/megatron/core/transformer/moe/moe_utils.py @@ -11,6 +11,7 @@ from megatron.core.fp8_utils import get_fp8_align_size from megatron.core.process_groups_config import ProcessGroupCollection from megatron.core.transformer.cuda_graphs import is_graph_capturing +from megatron.core.transformer.enums import CudaGraphScope from megatron.core.transformer.transformer_config import TransformerConfig try: @@ -1205,13 +1206,13 @@ def maybe_raise_signal(moe_layer, **kwargs): ): if ( step_condition == "route" - and 'moe_router' in moe_layer.config.cuda_graph_scope - and 'moe_preprocess' not in moe_layer.config.cuda_graph_scope + and CudaGraphScope.moe_router in moe_layer.config.cuda_graph_scope + and CudaGraphScope.moe_preprocess not in moe_layer.config.cuda_graph_scope ): raise MoECudaGraphPartialCaptureSignal(moe_layer, "route", **kwargs) elif ( step_condition == "preprocess" - and 'moe_preprocess' in moe_layer.config.cuda_graph_scope + and CudaGraphScope.moe_preprocess in moe_layer.config.cuda_graph_scope ): raise MoECudaGraphPartialCaptureSignal(moe_layer, "preprocess", **kwargs) diff --git a/megatron/core/transformer/moe/token_dispatcher.py b/megatron/core/transformer/moe/token_dispatcher.py index b2135fdb00d..af8ae572adb 100644 --- a/megatron/core/transformer/moe/token_dispatcher.py +++ b/megatron/core/transformer/moe/token_dispatcher.py @@ -16,6 +16,7 @@ gather_from_sequence_parallel_region, reduce_scatter_to_sequence_parallel_region, ) +from megatron.core.transformer.enums import CudaGraphScope from megatron.core.transformer.moe.fused_a2a import ( fused_combine, fused_dispatch, @@ -436,7 +437,7 @@ def __init__( } if ( config.cuda_graph_impl == "transformer_engine" - and 'moe_preprocess' in config.cuda_graph_scope + and CudaGraphScope.moe_preprocess in config.cuda_graph_scope ): self.cuda_dtoh_point = "before_ep_alltoall" else: @@ -1075,10 +1076,13 @@ def combine( num_permuted_tokens=self.num_permuted_tokens, pad_multiple=self.pad_multiple, ) - # Release the used handle/num_permuted_tokens which could change in each iteration + # Release the used handle/num_permuted_tokens which could change in each iteration. + # For drop_and_pad mode, we don't need to reset the num_permuted_tokens and + # num_dispatched_tokens, because their values never change. self.handle = None - self.num_permuted_tokens = None - self.num_dispatched_tokens = None + if not self.drop_and_pad: + self.num_permuted_tokens = None + self.num_dispatched_tokens = None return hidden_states def get_permuted_hidden_states_by_experts(self, hidden_states: torch.Tensor) -> torch.Tensor: diff --git a/megatron/core/transformer/transformer_block.py b/megatron/core/transformer/transformer_block.py index 6f69927e9e8..023db1fe75a 100755 --- a/megatron/core/transformer/transformer_block.py +++ b/megatron/core/transformer/transformer_block.py @@ -21,7 +21,7 @@ ) from megatron.core.pipeline_parallel.utils import is_vp_first_stage, is_vp_last_stage from megatron.core.process_groups_config import ProcessGroupCollection -from megatron.core.transformer.enums import LayerType +from megatron.core.transformer.enums import CudaGraphScope, LayerType from megatron.core.transformer.module import GraphableMegatronModule, MegatronModule from megatron.core.transformer.spec_utils import ModuleSpec, build_module from megatron.core.transformer.transformer_config import TransformerConfig @@ -555,7 +555,7 @@ def _should_call_local_cudagraph(self, *args, **kwargs): kwargs.get('inference_context') is not None or kwargs.get('inference_params') is not None ) - and 'full_iteration' in self.config.cuda_graph_scope + and CudaGraphScope.full_iteration in self.config.cuda_graph_scope ): if kwargs['inference_context'].is_static_batching(): using_cuda_graph = kwargs['inference_context'].is_decode_only() diff --git a/megatron/core/transformer/transformer_config.py b/megatron/core/transformer/transformer_config.py index fae2e2f5d4d..cc714e9ac15 100644 --- a/megatron/core/transformer/transformer_config.py +++ b/megatron/core/transformer/transformer_config.py @@ -9,7 +9,7 @@ from megatron.core.enums import Fp4Recipe, Fp8Recipe from megatron.core.quantization.quant_config import RecipeConfig -from megatron.core.transformer.enums import AttnBackend +from megatron.core.transformer.enums import AttnBackend, CudaGraphScope from megatron.core.transformer.pipeline_parallel_layer_layout import PipelineParallelLayerLayout from ..fusions.fused_bias_geglu import quick_gelu @@ -711,7 +711,7 @@ class TransformerConfig(ModelParallelConfig): excluding optimizer) is enabled. "transformer_engine": capture the CUDA graph using TE make_graphed_callables().""" - cuda_graph_scope: Optional[List[str]] = None + cuda_graph_scope: Optional[List[CudaGraphScope]] = None """Determines the CUDA graphs capturing scope. When cuda_graph_impl is set to "transformer_engine", valid values are "attn", "mlp", "moe", "moe_router", "moe_preprocess", "mamba". None means the full layer. @@ -1593,65 +1593,76 @@ def __post_init__(self): 'use cuda_graph_impl=transformer_engine instead.' ) self.cuda_graph_impl = "transformer_engine" + if self.cuda_graph_scope is None: self.cuda_graph_scope = [] + elif not isinstance(self.cuda_graph_scope, list): + if isinstance(self.cuda_graph_scope, CudaGraphScope): + self.cuda_graph_scope = [self.cuda_graph_scope] + else: + assert isinstance(self.cuda_graph_scope, str), ( + "cuda_graph_scope must be a string that can be converted to a list of " + f"CudaGraphScope, got {self.cuda_graph_scope}." + ) + self.cuda_graph_scope = self.cuda_graph_scope.split(',') + if all(isinstance(scope, str) for scope in self.cuda_graph_scope): + # Backward compatibility for "full" scope. Now we use an empty list instead. + if "full" in self.cuda_graph_scope: + assert self.cuda_graph_scope == [ + "full" + ], "full scope cannot be used with other scopes." + warnings.warn( + "full scope is deprecated. " + "Use empty cuda_graph_scope to capture the whole layer." + ) + self.cuda_graph_scope = [] + else: + self.cuda_graph_scope = [CudaGraphScope[scope] for scope in self.cuda_graph_scope] + assert all( + isinstance(scope, CudaGraphScope) for scope in self.cuda_graph_scope + ), f"cuda_graph_scope must be a list of CudaGraphScope, got {self.cuda_graph_scope}." + if self.cuda_graph_impl != "none": assert self.cuda_graph_impl in [ "transformer_engine", "local", ], f"Invalid cuda graph implementation: {self.cuda_graph_impl}" + if self.cpu_offloading: raise ValueError("CUDA graphs not supported with CPU offloading.") - elif not isinstance(self.cuda_graph_scope, list): - assert isinstance(self.cuda_graph_scope, str), ( - "cuda_graph_scope must be a string or a list of strings, " - f"got {self.cuda_graph_scope}." - ) - self.cuda_graph_scope = [self.cuda_graph_scope] - if self.cuda_graph_impl == "local": - assert not self.cuda_graph_scope or self.cuda_graph_scope == ["full_iteration"], ( - "For local cuda graph implementation, the only valid value " - "for cuda_graph_scope is full_iteration. " - "To use other scopes, use cuda_graph_impl=transformer_engine." + assert not self.cuda_graph_scope or self.cuda_graph_scope == [ + CudaGraphScope.full_iteration + ], ( + "For local cuda graph implementation, the only valid value for " + "cuda_graph_scope is full_iteration, or an empty list to denote layerwise " + "graphs. To use other scopes, use cuda_graph_impl=transformer_engine." ) if self.cuda_graph_impl == "transformer_engine": - assert "full_iteration" not in self.cuda_graph_scope, ( + assert CudaGraphScope.full_iteration not in self.cuda_graph_scope, ( "To use full iteration cuda graph, please use " - "cuda_graph_impl=transformer_engine instead of cuda_graph_impl=local." + "cuda_graph_impl=local instead of cuda_graph_impl=transformer_engine." ) - for scope in self.cuda_graph_scope: - assert scope in [ - 'attn', - 'mlp', - 'moe', - 'moe_router', - 'moe_preprocess', - 'mamba', - ], ( - "--cuda-graph-scope should be attn, mlp, moe, moe_router, moe_preprocess, " - f"or mamba, got {self.cuda_graph_scope}." - ) - assert ( - 'moe' not in self.cuda_graph_scope or 'moe_router' not in self.cuda_graph_scope + CudaGraphScope.moe not in self.cuda_graph_scope + or CudaGraphScope.moe_router not in self.cuda_graph_scope ), 'cuda_graph_scope must not contain both moe and moe_router.' - if 'moe_preprocess' in self.cuda_graph_scope: + if CudaGraphScope.moe_preprocess in self.cuda_graph_scope: assert ( - 'moe_router' in self.cuda_graph_scope + CudaGraphScope.moe_router in self.cuda_graph_scope ), 'moe_preprocess cuda graph is only supported with moe_router cuda graph.' if self.num_moe_experts is None or self.num_moe_experts <= 1: assert ( - 'moe' not in self.cuda_graph_scope - and 'moe_router' not in self.cuda_graph_scope + CudaGraphScope.moe not in self.cuda_graph_scope + and CudaGraphScope.moe_router not in self.cuda_graph_scope ), 'moe cuda graph is only supported for MoE.' else: if self.moe_layer_freq == 1 or ( isinstance(self.moe_layer_freq, list) and 0 not in self.moe_layer_freq ): - assert 'mlp' not in self.cuda_graph_scope, ( + assert CudaGraphScope.mlp not in self.cuda_graph_scope, ( 'mlp cuda graph is only supported for dense layers, ' 'but not found in the model.' ) @@ -1660,13 +1671,13 @@ def __post_init__(self): or not self.moe_pad_expert_input_to_capacity ): assert ( - 'moe' not in self.cuda_graph_scope + CudaGraphScope.moe not in self.cuda_graph_scope ), 'moe cuda graph is only supported with drop-padding MoE.' if self.moe_token_dispatcher_type == 'alltoall' and ( self.moe_expert_capacity_factor is not None or self.moe_router_padding_for_quantization ): - assert 'moe_preprocess' not in self.cuda_graph_scope, ( + assert CudaGraphScope.moe_preprocess not in self.cuda_graph_scope, ( 'moe_preprocess cuda graph is not supported when there are ' 'DtoH copies and synchronizations in the preprocess step.' ) @@ -1676,25 +1687,28 @@ def __post_init__(self): raise ValueError( "Full-layer CUDA graphs not supported with activation recomputation." ) - elif self.cuda_graph_scope != ['full_iteration']: + elif self.cuda_graph_scope != [CudaGraphScope.full_iteration]: # For scoped CUDA graphs, only the non-graphed parts of the layer can be # recomputed. So check if there are overlaps between the recomputed parts # and the graphed parts. - if "attn" in self.cuda_graph_scope: + if CudaGraphScope.attn in self.cuda_graph_scope: for module in self.recompute_modules: if module in ['core_attn', 'mla_up_proj']: raise ValueError( f'attn cuda graph is not supported with {module} recompute.' ) - if "mlp" in self.cuda_graph_scope and "mlp" in self.recompute_modules: + if ( + CudaGraphScope.mlp in self.cuda_graph_scope + and "mlp" in self.recompute_modules + ): raise ValueError(f'mlp cuda graph is not supported with mlp recompute.') - if "moe" in self.cuda_graph_scope: + if CudaGraphScope.moe in self.cuda_graph_scope: for module in self.recompute_modules: if module in ['moe_act', 'moe', 'shared_experts']: raise ValueError( f'moe cuda graph is not supported with {module} recompute.' ) - if "moe_router" in self.cuda_graph_scope: + if CudaGraphScope.moe_router in self.cuda_graph_scope: for module in self.recompute_modules: if module in ['moe', 'shared_experts']: raise ValueError( @@ -1703,25 +1717,25 @@ def __post_init__(self): ) if "layernorm" in self.recompute_modules: if ( - "attn" in self.cuda_graph_scope - and "mlp" in self.cuda_graph_scope + CudaGraphScope.attn in self.cuda_graph_scope + and CudaGraphScope.mlp in self.cuda_graph_scope and ( - "moe" in self.cuda_graph_scope - or "moe_router" in self.cuda_graph_scope + CudaGraphScope.moe in self.cuda_graph_scope + or CudaGraphScope.moe_router in self.cuda_graph_scope ) ): raise ValueError( 'cuda graph is not supported with layernorm recompute.' ) - if "attn" in self.cuda_graph_scope: + if CudaGraphScope.attn in self.cuda_graph_scope: warnings.warn( "input_layernorm recompute is not supported with attention " "cudagraph. Will only recompute the pre_mlp_layernorm." ) if ( - "mlp" in self.cuda_graph_scope - or "moe" in self.cuda_graph_scope - or "moe_router" in self.cuda_graph_scope + CudaGraphScope.mlp in self.cuda_graph_scope + or CudaGraphScope.moe in self.cuda_graph_scope + or CudaGraphScope.moe_router in self.cuda_graph_scope ): warnings.warn( "pre_mlp_layernorm recompute is not supported with mlp/moe " diff --git a/megatron/core/transformer/transformer_layer.py b/megatron/core/transformer/transformer_layer.py index f89678e6216..3ea40577009 100644 --- a/megatron/core/transformer/transformer_layer.py +++ b/megatron/core/transformer/transformer_layer.py @@ -16,7 +16,7 @@ from megatron.core.packed_seq_params import PackedSeqParams from megatron.core.process_groups_config import ProcessGroupCollection from megatron.core.transformer.cuda_graphs import is_graph_capturing -from megatron.core.transformer.enums import LayerType +from megatron.core.transformer.enums import CudaGraphScope, LayerType from megatron.core.transformer.identity_op import IdentityFuncOp, IdentityOp from megatron.core.transformer.mlp import MLP from megatron.core.transformer.module import GraphableMegatronModule @@ -382,18 +382,21 @@ def __init__( if "layernorm" in self.config.recompute_modules: if not isinstance(self.input_layernorm, IdentityOp) and ( self.config.cuda_graph_impl == "none" - or 'attn' not in self.config.cuda_graph_scope + or CudaGraphScope.attn not in self.config.cuda_graph_scope ): self.recompute_input_layernorm = True if self.config.fp8 or self.config.fp4: self.self_attention.set_for_recompute_input_layernorm() if not isinstance(self.pre_mlp_layernorm, IdentityOp) and ( self.config.cuda_graph_impl == "none" - or (not self.is_moe_layer and 'mlp' not in self.config.cuda_graph_scope) + or ( + not self.is_moe_layer + and CudaGraphScope.mlp not in self.config.cuda_graph_scope + ) or ( self.is_moe_layer - and 'moe' not in self.config.cuda_graph_scope - and 'moe_router' not in self.config.cuda_graph_scope + and CudaGraphScope.moe not in self.config.cuda_graph_scope + and CudaGraphScope.moe_router not in self.config.cuda_graph_scope ) ): self.recompute_pre_mlp_layernorm = True @@ -634,12 +637,13 @@ def _forward_mlp(self, hidden_states, inference_context=None): and self.config.cuda_graph_impl == "transformer_engine" and self.training and is_graph_capturing() - and 'moe_router' in self.config.cuda_graph_scope + and CudaGraphScope.moe_router in self.config.cuda_graph_scope ): assert ( not self.recompute_pre_mlp_layernorm ), "Recomputation is not supported for CUDA graph." cudagraph_outputs = self.mlp(pre_mlp_layernorm_output) + nvtx_range_pop(suffix="mlp") return cudagraph_outputs + [residual] elif self.recompute_mlp: if self.config.fp8 or self.config.fp4: @@ -694,6 +698,7 @@ def _forward_post_mlp(self, mlp_output_with_bias, residual): Returns: output (Tensor): Transformed hidden states of shape [s, b, h]. """ + from megatron.core.pipeline_parallel.fine_grained_activation_offload import ( fine_grained_offloading_group_commit, ) @@ -757,7 +762,7 @@ def get_layer_static_inputs(self, seq_length, micro_batch_size): static_inputs = super().get_layer_static_inputs(seq_length, micro_batch_size) if not isinstance(self.self_attention, IdentityOp) and ( - not self.config.cuda_graph_scope or 'attn' in self.config.cuda_graph_scope + not self.config.cuda_graph_scope or CudaGraphScope.attn in self.config.cuda_graph_scope ): slen_per_cp = seq_length // self.config.context_parallel_size static_inputs["attention_mask"] = ( @@ -776,18 +781,18 @@ def _get_submodules_under_cudagraphs(self): return super()._get_submodules_under_cudagraphs() submodules = [] - if 'attn' in self.config.cuda_graph_scope: + if CudaGraphScope.attn in self.config.cuda_graph_scope: submodules += [ self.input_layernorm, self.self_attention, self.pre_cross_attn_layernorm, self.cross_attention, ] - if (not self.is_moe_layer and 'mlp' in self.config.cuda_graph_scope) or ( - self.is_moe_layer and 'moe' in self.config.cuda_graph_scope + if (not self.is_moe_layer and CudaGraphScope.mlp in self.config.cuda_graph_scope) or ( + self.is_moe_layer and CudaGraphScope.moe in self.config.cuda_graph_scope ): submodules += [self.pre_mlp_layernorm, self.mlp] - elif self.is_moe_layer and 'moe_router' in self.config.cuda_graph_scope: + elif self.is_moe_layer and CudaGraphScope.moe_router in self.config.cuda_graph_scope: submodules += [self.pre_mlp_layernorm, self.mlp.router] if ( self.config.moe_shared_expert_intermediate_size is not None @@ -805,7 +810,7 @@ def _te_cuda_graph_capture(self, *args, **kwargs): 2. If context is None, it cannot be returned as output. """ context = None - if not self.config.cuda_graph_scope or 'attn' in self.config.cuda_graph_scope: + if not self.config.cuda_graph_scope or CudaGraphScope.attn in self.config.cuda_graph_scope: hidden_states, context = self._forward_attention(*args, **kwargs) else: if len(args) > 0: @@ -815,12 +820,12 @@ def _te_cuda_graph_capture(self, *args, **kwargs): if ( not self.config.cuda_graph_scope - or (not self.is_moe_layer and 'mlp' in self.config.cuda_graph_scope) + or (not self.is_moe_layer and CudaGraphScope.mlp in self.config.cuda_graph_scope) or ( self.is_moe_layer and ( - 'moe' in self.config.cuda_graph_scope - or 'moe_router' in self.config.cuda_graph_scope + CudaGraphScope.moe in self.config.cuda_graph_scope + or CudaGraphScope.moe_router in self.config.cuda_graph_scope ) ) ): @@ -841,7 +846,7 @@ def _te_cuda_graph_replay(self, *args, **kwargs): Hence, `inference_context` and `packed_seq_params` are excluded from input list. """ context = None - if self.config.cuda_graph_scope and 'attn' not in self.config.cuda_graph_scope: + if self.config.cuda_graph_scope and CudaGraphScope.attn not in self.config.cuda_graph_scope: hidden_states, context = self._forward_attention(*args, **kwargs) args = (hidden_states,) kwargs = {} @@ -861,13 +866,13 @@ def _te_cuda_graph_replay(self, *args, **kwargs): if ( not self.config.cuda_graph_scope - or (not self.is_moe_layer and 'mlp' in self.config.cuda_graph_scope) - or (self.is_moe_layer and 'moe' in self.config.cuda_graph_scope) + or (not self.is_moe_layer and CudaGraphScope.mlp in self.config.cuda_graph_scope) + or (self.is_moe_layer and CudaGraphScope.moe in self.config.cuda_graph_scope) ): # CUDA Graph captures the whole MLP/MoE part. CUDA Graph output is the layer output. assert len(cuda_graph_output) == 1, "CUDA Graph output should be the layer output." output = cuda_graph_output.pop() - elif self.is_moe_layer and 'moe_router' in self.config.cuda_graph_scope: + elif self.is_moe_layer and CudaGraphScope.moe_router in self.config.cuda_graph_scope: # CUDA Graph partially captures the MoE. # The rest of the layer should go to the normal pass. shared_expert_output, routing_map, residual = None, None, None @@ -882,7 +887,7 @@ def _te_cuda_graph_replay(self, *args, **kwargs): # Split cudagraph outputs into function outputs and attribute outputs, and # process them separately. Function outputs should have three tensors. func_output, attr_outputs = cuda_graph_output[:3], cuda_graph_output[3:] - if 'moe_preprocess' in self.config.cuda_graph_scope: + if CudaGraphScope.moe_preprocess in self.config.cuda_graph_scope: hidden_states, probs, residual = func_output valid_cudagraph_attrs = self.mlp.token_dispatcher.valid_cudagraph_attrs assert len(attr_outputs) == len( @@ -989,7 +994,7 @@ def _should_call_local_cudagraph(self, *args, **kwargs): (kwargs.get('inference_context') is not None) or (kwargs.get('inference_params') is not None) ) - and 'full_iteration' not in self.config.cuda_graph_scope + and CudaGraphScope.full_iteration not in self.config.cuda_graph_scope ): if kwargs['inference_context'].is_static_batching(): using_cuda_graph = kwargs['inference_context'].is_decode_only() diff --git a/megatron/training/arguments.py b/megatron/training/arguments.py index bb1b17e9ba2..15576e2ceac 100644 --- a/megatron/training/arguments.py +++ b/megatron/training/arguments.py @@ -23,7 +23,7 @@ from megatron.core.rerun_state_machine import RerunStateMachine from megatron.core.transformer import MLATransformerConfig, TransformerConfig from megatron.core.transformer.pipeline_parallel_layer_layout import PipelineParallelLayerLayout -from megatron.core.transformer.enums import AttnBackend +from megatron.core.transformer.enums import AttnBackend, CudaGraphScope from megatron.core.transformer.heterogeneous.heterogeneous_config import ( HeterogeneousTransformerConfig, MLPConfig, @@ -772,7 +772,7 @@ def validate_args(args, defaults={}): if args.rank == 0: print('accumulate and all-reduce gradients in fp32 for ' 'bfloat16 data type.', flush=True) - if args.cuda_graph_impl == "local" and "full_iteration" in args.cuda_graph_scope: + if args.cuda_graph_impl == "local" and CudaGraphScope.full_iteration in args.cuda_graph_scope: if not args.inference_dynamic_batching: assert not args.check_for_nan_in_loss_and_grad, \ "--no-check-for-nan-in-loss-and-grad should be set with full_iteration CUDA graph" @@ -1265,6 +1265,15 @@ def validate_args(args, defaults={}): assert ( args.recompute_granularity != 'full' ), 'recompute_granularity must not be full when CUDA Graphs are enabled.' + if args.cuda_graph_scope == "full" or ( + isinstance(args.cuda_graph_scope, list) and "full" in args.cuda_graph_scope + ): + if isinstance(args.cuda_graph_scope, list): + assert args.cuda_graph_scope == ["full"], "full scope cannot be used with other scopes." + args.cuda_graph_scope = [] + warn_rank_0( + 'full scope is deprecated. Use empty cuda_graph_scope to capture the whole layer.' + ) if args.multi_latent_attention: assert not args.group_query_attention, "Group query attention is mutually exclusive with multi latent attention." @@ -1486,7 +1495,7 @@ def _add_inference_args(parser): '"none": no CUDA graph. ' '"local": capture the CUDA graph using MCore local implementation. --cuda-graph-scope=\"full_iteration\" enables whole iteration CUDA graph. ' '"transformer_engine": capture the CUDA graph using TE make_graphed_callables().') - group.add_argument('--cuda-graph-scope', nargs='+', type=str, default=[], + group.add_argument('--cuda-graph-scope', nargs='+', type=lambda scope: CudaGraphScope[scope] if scope != "full" else scope, default=[], help='Determines the CUDA graphs capturing scope. ' 'choices: "attn", "mlp", "moe", "moe_router", "moe_preprocess", "mamba", "full_iteration". ' '"attn": captures operations in TransformerLayer._forward_attention(). ' @@ -1498,7 +1507,8 @@ def _add_inference_args(parser): '"mamba": captures the mamba layer. ' '"full_iteration": captures a whole iteration. ' 'full_iteration scope is only supported with --cuda-graph-impl=local, other scopes are only supported with --cuda-graph-impl=transformer_engine. ' - 'If not specified, the default scope is to capture the whole Transformer layer.') + 'If not specified, the default scope is to capture the whole Transformer layer. ' + 'For backward compatibility, we still allow passing "full" to specify capturing the whole layer, and convert it to an empty list.') group.add_argument('--use-legacy-static-engine', action='store_true', default=False, help='Use legacy static engine. (Current static engine uses dynamic engine under the hood)', dest='use_legacy_static_engine') diff --git a/megatron/training/training.py b/megatron/training/training.py index 9986f931641..2029f4c0bbc 100644 --- a/megatron/training/training.py +++ b/megatron/training/training.py @@ -59,6 +59,7 @@ from megatron.training.checkpointing import checkpoint_exists from megatron.core.full_cuda_graph import FullCudaGraphWrapper from megatron.core.transformer.cuda_graphs import TECudaGraphHelper +from megatron.core.transformer.enums import CudaGraphScope from megatron.core.transformer.module import Float16Module from megatron.core.distributed import DistributedDataParallelConfig, TorchFullyShardedDataParallelConfig from megatron.core.distributed import DistributedDataParallel as DDP @@ -2261,7 +2262,7 @@ def train( eval_iterations = 0 # Wrap forward_backward_func for Full iteration CUDA graph forward_backward_func = get_forward_backward_func() - if args.cuda_graph_impl == "local" and "full_iteration" in args.cuda_graph_scope: + if args.cuda_graph_impl == "local" and CudaGraphScope.full_iteration in args.cuda_graph_scope: forward_backward_func = FullCudaGraphWrapper(forward_backward_func, cuda_graph_warmup_steps=args.cuda_graph_warmup_steps) def get_e2e_base_metrics(): @@ -2609,6 +2610,10 @@ def get_e2e_base_metrics(): if should_exit: break + # Destroy CUDA Graphs. + if args.cuda_graph_impl == "transformer_engine" and cuda_graph_helper.graphs_created(): + cuda_graph_helper.delete_cuda_graphs() + one_logger_utils.track_e2e_metrics() # Flush TensorBoard, WandB writers and one-logger. @@ -2682,7 +2687,7 @@ def evaluate( eval_batch_size = args.global_batch_size eval_num_microbatches = eval_batch_size // (args.micro_batch_size * args.data_parallel_size) forward_backward_func = get_forward_backward_func() - if args.cuda_graph_impl == "local" and "full_iteration" in args.cuda_graph_scope: + if args.cuda_graph_impl == "local" and CudaGraphScope.full_iteration in args.cuda_graph_scope: forward_backward_func = FullCudaGraphWrapper(forward_backward_func, cuda_graph_warmup_steps=args.cuda_graph_warmup_steps) if eval_iters is None: diff --git a/tests/unit_tests/inference/engines/test_dynamic_engine.py b/tests/unit_tests/inference/engines/test_dynamic_engine.py index 0ac4b296746..26d3dcfbd6d 100644 --- a/tests/unit_tests/inference/engines/test_dynamic_engine.py +++ b/tests/unit_tests/inference/engines/test_dynamic_engine.py @@ -3,7 +3,7 @@ import asyncio import random import types -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple import pytest @@ -41,6 +41,7 @@ from megatron.core.models.mamba.mamba_model import MambaModel from megatron.core.tensor_parallel.random import model_parallel_cuda_manual_seed from megatron.core.transformer.cuda_graphs import CudaGraphManager, _CudagraphGlobalRecord +from megatron.core.transformer.enums import CudaGraphScope from megatron.core.transformer.transformer_config import TransformerConfig from megatron.core.utils import ( check_mamba_sequence_packing_support, @@ -103,7 +104,9 @@ class DynamicEngineTestConfig: return_log_probs: bool = False materialize_only_last_token_logits: bool = True skip_prompt_log_probs: bool = False - cuda_graph_scope: List[str] = None + cuda_graph_scope: List[CudaGraphScope] = field( + default_factory=lambda: [CudaGraphScope.full_iteration] + ) force_build_cuda_graphs: bool = False # If False, do not build cuda graphs in the tests, even if # num_cuda_graphs is set. @@ -136,9 +139,6 @@ def __post_init__(self): if self.context_max_tokens_override is None: self.context_max_tokens_override = self.num_requests * self.max_sequence_length - if self.cuda_graph_scope is None: - self.cuda_graph_scope = ["full_iteration"] - @dataclass class DynamicEngineTestEnv: @@ -514,7 +514,7 @@ def teardown_method(self, method): ) @pytest.mark.parametrize("model_provider", ["gpt", "mamba"]) @pytest.mark.parametrize("num_cuda_graphs", [None, 1, 4]) - @pytest.mark.parametrize("cuda_graph_scope", [[], ["full_iteration"]]) + @pytest.mark.parametrize("cuda_graph_scope", [[], [CudaGraphScope.full_iteration]]) def test_simple(self, model_provider, num_cuda_graphs, cuda_graph_scope) -> None: """Simple test that runs without errors, and validates output.""" skip_if_mamba_sequence_packing_not_available(model_provider) diff --git a/tests/unit_tests/test_fp8_param.py b/tests/unit_tests/test_fp8_param.py index 0b8d41769ec..361698f7127 100644 --- a/tests/unit_tests/test_fp8_param.py +++ b/tests/unit_tests/test_fp8_param.py @@ -1,4 +1,4 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. import contextlib import gc @@ -36,7 +36,10 @@ try: from transformer_engine.pytorch.tensor.utils import post_all_gather_processing - cuda_graph_supported = True + if is_te_min_version("2.10.0"): + cuda_graph_supported = True + else: + reason_for_no_cuda_graph = "Need newer TransformerEngine" except ImportError: reason_for_no_cuda_graph = "Need newer TransformerEngine" @@ -65,12 +68,16 @@ class TestFP8Param: def setup_method(self, method): self.seq_length = 512 self.micro_batch_size = 2 + self.cuda_graph_helper = None os.environ['CUDA_DEVICE_MAX_CONNECTIONS'] = '1' def teardown_method(self, method): Utils.destroy_model_parallel() destroy_global_vars() destroy_num_microbatches_calculator() + if self.cuda_graph_helper is not None and self.cuda_graph_helper.graphs_created(): + self.cuda_graph_helper.delete_cuda_graphs() + self.cuda_graph_helper = None gc.collect() def model_provider( @@ -209,13 +216,12 @@ def _run_test_helper( ) assert len(gpt_model) == 1 # Assume only one model in the model provider. - cuda_graph_helper = None # Hard coded to use cuda_graph_impl="transformer_engine" cuda_graph_impl = "transformer_engine" if use_cuda_graph and cuda_graph_impl == "transformer_engine": from megatron.core.transformer.cuda_graphs import TECudaGraphHelper - cuda_graph_helper = TECudaGraphHelper( + self.cuda_graph_helper = TECudaGraphHelper( model=gpt_model, config=gpt_model[0].config, seq_length=self.seq_length, @@ -250,13 +256,13 @@ def _run_test_helper( # Capture CUDA graphs after warmup if helper is provided. # Hard coded cuda_graph_warmup_steps = 0. cuda_graph_warmup_steps = 0 - if cuda_graph_helper is not None and i == cuda_graph_warmup_steps: + if self.cuda_graph_helper is not None and i == cuda_graph_warmup_steps: if should_disable_forward_pre_hook(args): disable_forward_pre_hook(gpt_model, param_sync=False) - cuda_graph_helper.create_cudagraphs() + self.cuda_graph_helper.create_cudagraphs() if should_disable_forward_pre_hook(args): enable_forward_pre_hook(gpt_model) - cuda_graph_helper.cuda_graph_set_manual_hooks() + self.cuda_graph_helper.cuda_graph_set_manual_hooks() # For the mxfp8_param with reuse_grad_buf_for_mxfp8_param_ag and dp_ag_overlap, # we need to call the _copy_main_params_to_param_buffer() after the grad buffer @@ -297,6 +303,10 @@ def _run_test_helper( loss_list.append(loss.item()) + if self.cuda_graph_helper is not None and self.cuda_graph_helper.graphs_created(): + self.cuda_graph_helper.delete_cuda_graphs() + self.cuda_graph_helper = None + return torch.tensor(loss_list) def run_test(self, tp_size, recipe, inference: bool = False, **kwargs): diff --git a/tests/unit_tests/transformer/test_cuda_graphs.py b/tests/unit_tests/transformer/test_cuda_graphs.py index 3ad0262a1cf..cee75171560 100644 --- a/tests/unit_tests/transformer/test_cuda_graphs.py +++ b/tests/unit_tests/transformer/test_cuda_graphs.py @@ -9,6 +9,7 @@ import pytest import torch +from transformer_engine.pytorch.fp8 import check_fp8_support from megatron.core import parallel_state from megatron.core.enums import ModelType @@ -25,6 +26,7 @@ TextGenerationController, ) from megatron.core.models.gpt.gpt_layer_specs import ( + get_gpt_decoder_block_spec, get_gpt_layer_local_spec, get_gpt_layer_with_transformer_engine_spec, get_gpt_mtp_block_spec, @@ -41,6 +43,8 @@ model_parallel_cuda_manual_seed, ) from megatron.core.transformer.cuda_graphs import CudaGraphManager, _CudagraphGlobalRecord +from megatron.core.transformer.enums import CudaGraphScope +from megatron.core.transformer.moe.fused_a2a import reset_hybrid_ep_buffer from megatron.core.transformer.transformer_block import TransformerBlock from megatron.core.transformer.transformer_config import TransformerConfig from megatron.core.utils import is_fa_min_version, is_te_min_version @@ -54,6 +58,8 @@ from megatron.training.training import setup_model_and_optimizer from tests.unit_tests.test_utilities import Utils +fp8_available, _ = check_fp8_support() + class TestParallelTransformerBlockCudagraphs: def setup_method(self, method): @@ -747,6 +753,9 @@ class TestPartialCudaGraph: def setup_method(self, method): self.seq_length = 512 self.micro_batch_size = 2 + self.tp_size = 2 + self.cp_size = 2 + self.cuda_graph_helper = None # Store original environment variable values self.original_env = { 'CUDA_DEVICE_MAX_CONNECTIONS': os.environ.get('CUDA_DEVICE_MAX_CONNECTIONS'), @@ -762,22 +771,28 @@ def teardown_method(self, method): os.environ.pop(key, None) else: os.environ[key] = value - Utils.destroy_model_parallel() destroy_global_vars() destroy_num_microbatches_calculator() + if self.cuda_graph_helper is not None and self.cuda_graph_helper.graphs_created(): + self.cuda_graph_helper.delete_cuda_graphs() + self.cuda_graph_helper = None gc.collect() def model_provider( self, pre_process=True, post_process=True, - layer_spec_fn=get_gpt_layer_with_transformer_engine_spec, + layer_spec_fn=get_gpt_decoder_block_spec, **config_kwargs, ): - model_parallel_cuda_manual_seed(123) args = get_args() config = core_transformer_config_from_args(args) - transformer_layer_spec = layer_spec_fn() + transformer_layer_spec = layer_spec_fn( + config, + use_transformer_engine=True, + normalization=args.normalization, + qk_l2_norm=args.qk_l2_norm, + ) if args.mtp_num_layers: mtp_block_spec = get_gpt_mtp_block_spec( config, transformer_layer_spec, use_transformer_engine=True @@ -810,18 +825,17 @@ def create_test_args( args.num_layers = 4 args.mtp_num_layers = 1 args.vocab_size = 1024 - args.hidden_size = 128 + args.hidden_size = 512 args.num_attention_heads = 8 args.max_position_embeddings = 512 - args.global_batch_size = self.micro_batch_size * 8 + args.global_batch_size = self.micro_batch_size * 8 // self.tp_size // self.cp_size args.micro_batch_size = self.micro_batch_size args.create_attention_mask_in_dataloader = True args.seq_length = self.seq_length - args.tensor_model_parallel_size = 2 - args.sequence_parallel = True + args.tensor_model_parallel_size = self.tp_size + args.sequence_parallel = True if self.tp_size > 1 else False args.pipeline_model_parallel_size = 1 - args.context_parallel_size = 1 - args.expert_model_parallel_size = ep_size + args.context_parallel_size = self.cp_size args.train_iters = 10 args.lr = 3e-5 args.bf16 = True @@ -836,17 +850,26 @@ def create_test_args( # MoE settings args.num_experts = 4 args.expert_model_parallel_size = ep_size + args.expert_tensor_parallel_size = 1 if ep_size > 1 else self.tp_size args.moe_shared_expert_intermediate_size = 1024 - args.moe_layer_freq = "[0,0,1,1]" + args.moe_layer_freq = [0, 0, 1, 1] args.moe_permute_fusion = True args.moe_router_fusion = True args.moe_router_topk = 2 + args.moe_router_dtype = "fp32" # CUDA graph settings args.cuda_graph_impl = cuda_graph_impl args.cuda_graph_scope = cuda_graph_scope args.cuda_graph_warmup_steps = cuda_graph_warmup_steps - args.use_te_rng_tracker = cuda_graph_impl != "none" + + # fp8 settings + if fp8_available: + args.fp8 = "e4m3" + args.fp8_recipe = "tensorwise" + args.first_last_layers_bf16 = True + args.num_layers_at_start_in_bf16 = 1 + args.num_layers_at_end_in_bf16 = 1 for key, value in kwargs.items(): assert hasattr(args, key) @@ -856,15 +879,15 @@ def create_test_args( set_global_variables(args, False) return args - def get_batch(self, seq_length, micro_batch_size): - data = list(range(seq_length)) + def get_batch(self, seq_length, micro_batch_size, cp_size): + data = list(range(seq_length // cp_size)) input_ids = torch.tensor(data, dtype=torch.int64).repeat((micro_batch_size, 1)).cuda() labels = 1 + torch.tensor(data, dtype=torch.int64).repeat((micro_batch_size, 1)).cuda() position_ids = torch.tensor(data, dtype=torch.int64).repeat((micro_batch_size, 1)).cuda() attention_mask = torch.ones( - (micro_batch_size, 1, seq_length, seq_length), dtype=bool + (micro_batch_size, 1, seq_length // cp_size, seq_length), dtype=bool ).cuda() - loss_mask = torch.ones(seq_length).repeat((micro_batch_size, 1)).cuda() + loss_mask = torch.ones(seq_length // cp_size).repeat((micro_batch_size, 1)).cuda() return input_ids, labels, position_ids, attention_mask, loss_mask def _run_test_helper( @@ -877,12 +900,10 @@ def _run_test_helper( set_args(args) torch.manual_seed(123) - Utils.initialize_model_parallel( - tensor_model_parallel_size=2, expert_model_parallel_size=ep_size - ) + model_parallel_cuda_manual_seed(123) input_ids, labels, position_ids, attention_mask, loss_mask = self.get_batch( - self.seq_length, self.micro_batch_size + self.seq_length, self.micro_batch_size, self.cp_size ) gpt_model, optimizer, _ = setup_model_and_optimizer( @@ -890,13 +911,10 @@ def _run_test_helper( ) assert len(gpt_model) == 1 # Assume only one model in the model provider. - loss_list = [] - - cuda_graph_helper = None if cuda_graph_impl == "transformer_engine": from megatron.core.transformer.cuda_graphs import TECudaGraphHelper - cuda_graph_helper = TECudaGraphHelper( + self.cuda_graph_helper = TECudaGraphHelper( model=gpt_model, config=gpt_model[0].config, seq_length=self.seq_length, @@ -904,14 +922,17 @@ def _run_test_helper( optimizers=[optimizer], ) + loss_list = [] + for i in range(100): gpt_model[0].zero_grad_buffer() optimizer.zero_grad() # Capture CUDA graphs after warmup if helper is provided - if cuda_graph_helper is not None and i == cuda_graph_warmup_steps: - cuda_graph_helper.create_cudagraphs() + if self.cuda_graph_helper is not None and i == cuda_graph_warmup_steps: + self.cuda_graph_helper.create_cudagraphs() + gpt_model[0].set_is_first_microbatch() output = gpt_model[0].forward( input_ids=input_ids, position_ids=position_ids, @@ -922,7 +943,7 @@ def _run_test_helper( # Check output shapes assert output.shape[0] == self.micro_batch_size - assert output.shape[1] == self.seq_length + assert output.shape[1] == self.seq_length // self.cp_size # Verify gradients loss = output.mean() @@ -936,16 +957,29 @@ def _run_test_helper( loss_list.append(loss.item()) + if self.cuda_graph_helper is not None and self.cuda_graph_helper.graphs_created(): + self.cuda_graph_helper.delete_cuda_graphs() + self.cuda_graph_helper = None + return torch.tensor(loss_list) @pytest.mark.skipif( - not (HAVE_TE and is_te_min_version("1.14.0")), - reason="Partial CUDA graph support requires TransformerEngine version >= 1.14.0", + not (HAVE_TE and is_te_min_version("2.10.0")), + reason="Partial CUDA graph UT support requires TransformerEngine version >= 2.10.0", ) @pytest.mark.parametrize("ep_size", [1, 4]) @pytest.mark.parametrize("moe_dropless_dispatcher", [False, True]) @pytest.mark.parametrize("moe_dispatcher_type", ["alltoall", "deepep", "hybridep"]) def test_moe_partial_cudagraph(self, ep_size, moe_dropless_dispatcher, moe_dispatcher_type): + initialize_rng_tracker(use_te_rng_tracker=True, force_reset=True) + Utils.initialize_model_parallel( + tensor_model_parallel_size=self.tp_size, + context_parallel_size=self.cp_size, + pipeline_model_parallel_size=1, + expert_tensor_parallel_size=1 if ep_size > 1 else self.tp_size, + expert_model_parallel_size=ep_size, + ) + extra_kwargs = {} if moe_dispatcher_type == "deepep": if not is_deep_ep_available(): @@ -962,19 +996,28 @@ def test_moe_partial_cudagraph(self, ep_size, moe_dropless_dispatcher, moe_dispa if not moe_dropless_dispatcher: if moe_dispatcher_type == "deepep": pytest.skip("Deep EP doesn't support drop&pad MoE") + if moe_dispatcher_type == "hybridep" and ep_size == 1: + pytest.skip("Hybrid EP doesn't support drop&pad MoE with ep_size == 1") extra_kwargs["moe_expert_capacity_factor"] = 1.0 extra_kwargs["moe_pad_expert_input_to_capacity"] = True loss_list_ref = self._run_test_helper(ep_size, "none", None, 0, **extra_kwargs) for cuda_graph_scope in [ None, - ["attn"], - ["moe"], - ["mlp", "moe_router"], - ["attn", "mlp", "moe_router", "moe_preprocess"], + [CudaGraphScope.attn], + [CudaGraphScope.moe], + [CudaGraphScope.mlp, CudaGraphScope.moe_router], + [ + CudaGraphScope.attn, + CudaGraphScope.mlp, + CudaGraphScope.moe_router, + CudaGraphScope.moe_preprocess, + ], ]: - if moe_dropless_dispatcher and (cuda_graph_scope is None or "moe" in cuda_graph_scope): - # Dropless MoE doesn't work with "moe" scope cudagraph. Skip. + if (moe_dropless_dispatcher or moe_dispatcher_type == "hybridep") and ( + cuda_graph_scope is None or CudaGraphScope.moe in cuda_graph_scope + ): + # Dropless MoE or Hybrid EP doesn't work with "moe" scope cudagraph. Skip. continue cuda_graph_warmup_steps = 3 loss_list = self._run_test_helper( @@ -986,6 +1029,10 @@ def test_moe_partial_cudagraph(self, ep_size, moe_dropless_dispatcher, moe_dispa ) assert torch.equal(loss_list, loss_list_ref) + if moe_dispatcher_type == "hybridep": + reset_hybrid_ep_buffer() + Utils.destroy_model_parallel() + if __name__ == "__main__":