diff --git a/examples/deepseek_v3/conf/train/engram.yaml b/examples/deepseek_v3/conf/train/engram.yaml new file mode 100644 index 0000000000..512d8ca1a7 --- /dev/null +++ b/examples/deepseek_v3/conf/train/engram.yaml @@ -0,0 +1,149 @@ +# DeepSeek Engram 27B +system: + no_shared_fs: ${experiment.runner.no_shared_fs} + num_workers: 2 + tensor_model_parallel_size: 8 + expert_model_parallel_size: 8 + expert_tensor_parallel_size: 1 + context_parallel_size: 1 + engram_embedding_parallel_size: 8 + sequence_parallel: true + use_distributed_optimizer: true + overlap_grad_reduce: true + overlap_param_gather: true + precision: + bf16: true + attention_softmax_in_fp32: true + accumulate_allreduce_grads_in_fp32: true + logging: + log_interval: 1 + tensorboard_log_interval: 1 + wandb_project: ${experiment.exp_name} + wandb_exp_name: ${experiment.exp_name} + log_timers_to_tensorboard: true + log_validation_ppl_to_tensorboard: true + log_throughput: true + log_params_norm: true + log_num_zeros_in_grad: true + log_memory_to_tensorboard: true + checkpoint: + save_interval: ${experiment.save_steps} + load: ${experiment.load} + ckpt_format: ${experiment.ckpt_format} + +model: + # nsys profile args ================= + # profile: true + # profile_step_start: 5 + # profile_step_end: 6 + # profile_ranks: [0,7] # default [0] + # Note, need to run with nsys profile + + # # torch profiler args ================= + # profile: true + # use_pytorch_profiler: true + # profile_step_start: 5 + # profile_step_end: 6 + # profile_ranks: [0] # default [0] + # tensorboard_dir: /workspace/torch_profile + transformer_impl: transformer_engine + num_layers: 30 + hidden_size: 2560 + num_attention_heads: 32 + num_query_groups: 32 # num_key_value_heads + seq_length: 4096 + max_position_embeddings: 4096 + norm_epsilon: 1e-6 + use_rotary_position_embeddings: true + rotary_base: 1000000 + swiglu: true + normalization: RMSNorm + qk_layernorm: true + init_method_std: 0.02 + attention_dropout: 0.0 + hidden_dropout: 0.0 + position_embedding_type: rope + untie_embeddings_and_output_weights: true + no_position_embedding: true + no_rope_fusion: true + disable_bias_linear: true + + # mla args ================== + multi_latent_attention: true + q_lora_rank: 768 + kv_lora_rank: 512 + qk_head_dim: 128 + qk_pos_emb_head_dim: 64 + v_head_dim: 128 + + # moe args =================== + ffn_hidden_size: 12288 + moe_ffn_hidden_size: 1536 + moe_grouped_gemm: true + moe_shared_expert_intermediate_size: 3072 + num_experts: 56 + moe_router_load_balancing_type: "seq_aux_loss" + moe_router_score_function: sigmoid + moe_router_enable_expert_bias: true + moe_router_bias_update_rate: 0.001 + moe_aux_loss_coeff: 0.02 + moe_layer_freq: "[0]+[1]*29" + # node limited routing + moe_router_num_groups: 1 + moe_router_group_topk: 1 + moe_router_topk: 6 + moe_router_topk_scaling_factor: 2.446 + moe_token_dispatcher_type: "alltoall" + # overlap_moe_expert_parallel_comm: true # Optional. + + # mtp args ==================== + # mtp_num_layers: 1 + # mtp_loss_scaling_factor: 0.3 + + # engram args ================= + use_engram: true + engram_tokenizer_name_or_path: /workspace/qwentokenizer + engram_vocab_size: [1131200, 1131200] + max_ngram_size: 3 + n_embed_per_ngram: 1280 + n_head_per_ngram: 8 + engram_layer_ids: [2, 15] + engram_pad_id: 0 + engram_seed: 0 + engram_kernel_size: 4 + engram_hc_mult: 1 + engram_embedding_parallel_method: alltoall # alltoall, allreduce, offload + + # training + seed: ${experiment.seed} + finetune: false + micro_batch_size: 2 + global_batch_size: 2048 + eval_iters: 0 + train_iters: 20 + + optimizer: + clip_grad: 1.0 + weight_decay: 0.1 + adam_beta1: 0.9 + adam_beta2: 0.95 + lr_scheduler: + lr: 3.0e-3 + min_lr: 3.0e-4 + lr_warmup_fraction: 0.01 + lr_decay_style: WSD + lr_wsd_decay_style: cosine + lr_wsd_decay_iters: 10 + +data: + reset_position_ids: True + reset_attention_mask: True + data_path: /workspace/data/enron_emails_demo_text_document_qwen + split: 1 + no_mmap_bin_files: true + tokenizer: + legacy_tokenizer: true + tokenizer_type: QwenTokenizerFS + tokenizer_path: /workspace/qwentokenizer + vocab_size: 151851 + make_vocab_size_divisible_by: 64 diff --git a/examples/deepseek_v3/conf/train_engram.yaml b/examples/deepseek_v3/conf/train_engram.yaml new file mode 100644 index 0000000000..30a4f9fa32 --- /dev/null +++ b/examples/deepseek_v3/conf/train_engram.yaml @@ -0,0 +1,51 @@ +defaults: + - _self_ + - train: engram + +experiment: + exp_name: DeepSeek-Engram + seed: 42 + save_steps: 100 + load: null + exp_dir: outputs/${experiment.exp_name} + ckpt_format: torch + # ckpt_format: fsdp_dtensor # Just for Megatron FSDP. + task: + type: train + backend: megatron + entrypoint: flagscale/train/megatron/train_engram.py + runner: + # 单机 + # per_node_task: false + # no_shared_fs: false + # rdzv_backend: static + # hostfile: null + # ssh_port: 10710 + # 多机 + per_node_task: false + no_shared_fs: false + backend: torchrun + nnodes: 3 + nproc_per_node: 8 + hostfile: hostfile # Select an available hostfile. Like ip_1 slosts=8\nip_2 slost=8... + master_port: 10720 # Select an available port. + ssh_port: 10710 # Select an available port. + master_addr: + rdzv_backend: static + cmds: + before_start: ulimit -n 1048576 && source /root/miniconda3/bin/activate /root/miniconda3/envs/flagscale-train + envs: + LOGLEVEL: "INFO" + CUDA_VISIBLE_DEVICES: "0,1,2,3,4,5,6,7" + CUDA_DEVICE_MAX_CONNECTIONS: 1 + NCCL_IB_HCA: "IB interface" # Select correct IB interface. + NCCL_SOCKET_IFNAME: "IP interface" # Select correct interface. + NCCL_IB_DISABLE: 0 + NCCL_DEBUG: "WARN" + NCCL_IB_GID_INDEX: 3 + +action: run + +hydra: + run: + dir: ${experiment.exp_dir}/hydra diff --git a/flagscale/models/megatron/engram/engram.py b/flagscale/models/megatron/engram/engram.py index 2a9a55ab16..12adc4b743 100644 --- a/flagscale/models/megatron/engram/engram.py +++ b/flagscale/models/megatron/engram/engram.py @@ -14,6 +14,9 @@ from .short_conv import ShortConv +## Megatron +from megatron.core.transformer.utils import sharded_state_dict_default + class Engram(nn.Module): def __init__(self, engram_cfg: EngramConfig, layer_id): super().__init__() @@ -34,13 +37,17 @@ def __init__(self, engram_cfg: EngramConfig, layer_id): pad_id=engram_cfg.engram_pad_id, seed=engram_cfg.engram_seed, ) - self.multi_head_embedding = MultiHeadEmbedding( + self.memory = MultiHeadEmbedding( engram_cfg, list_of_N=[ x for y in global_hash_mapping.vocab_size_across_layers[self.layer_id] for x in y ], D=engram_cfg.n_embed_per_ngram // engram_cfg.n_head_per_ngram, ) + self.embedding_cache = None # Cache for pre-computed embeddings + self.embedding_stream = None # Stream for pre-computing embeddings + if torch.cuda.is_available(): + self.embedding_stream = torch.cuda.Stream() self.short_conv = ShortConv( hidden_size=self.backbone_config.hidden_size, kernel_size=engram_cfg.engram_kernel_size, @@ -81,8 +88,14 @@ def forward(self, hidden_states, hash_input_ids): # [B, L, N_GRAM * N_HEADS_PER_GRAM] # fake hyper-connection hidden_states = hidden_states.unsqueeze(2) - - embeddings = self.multi_head_embedding(hash_input_ids).flatten(start_dim=-2) + if self.embedding_cache is not None: + embeddings, embedding_event = self.embedding_cache + if embedding_event is not None: + torch.cuda.current_stream().wait_event(embedding_event) # Ensure pre-computed embeddings are ready + self.embedding_cache = None # Clear cache after use + del embedding_event # Free the event + else: + embeddings = self.memory(hash_input_ids).flatten(start_dim=-2) # [L/tp_size, B, N_GRAM * N_HEADS_PER_GRAM, N_EMBED_PER_GRAM // N_HEADS_PER_GRAM] # [L/tp_size, B, N_GRAM * N_EMBED_PER_NGRAM] @@ -120,3 +133,34 @@ def forward(self, hidden_states, hash_input_ids): output = output.squeeze(2) return output + + def pre_compute_embedding(self, input_ids: torch.Tensor): + """ + Pre-compute the multi-head embedding for the given input IDs. + This can be called before the forward pass to warm up the embedding cache. + """ + assert input_ids is not None, "Input ids can not be None for EngramModel" + self.embedding_stream.synchronize() # Ensure previous computations on the stream are finished + with torch.cuda.stream(self.embedding_stream): + embedding_result = self.memory(input_ids).flatten(start_dim=-2) + embedding_event = torch.cuda.Event() + embedding_event.record(self.embedding_stream) + self.embedding_cache = (embedding_result, embedding_event) + + def sharded_state_dict( + self, prefix: str = "", sharded_offsets: tuple = (), metadata: dict | None = None + ): + sharded_dict = {} + memory_prefix = f"{prefix}memory." + sharded_dict.update(self.memory.sharded_state_dict(memory_prefix, sharded_offsets, metadata)) + conv_prefix = f"{prefix}short_conv." + sharded_dict.update(sharded_state_dict_default(self.short_conv, conv_prefix, sharded_offsets, metadata)) + value_proj_prefix = f"{prefix}value_proj." + sharded_dict.update(sharded_state_dict_default(self.value_proj, value_proj_prefix, sharded_offsets, metadata)) + key_projs_prefix = f"{prefix}key_projs." + sharded_dict.update(sharded_state_dict_default(self.key_projs, key_projs_prefix, sharded_offsets, metadata)) + norm1_prefix = f"{prefix}norm1." + sharded_dict.update(sharded_state_dict_default(self.norm1, norm1_prefix, sharded_offsets, metadata)) + norm2_prefix = f"{prefix}norm2." + sharded_dict.update(sharded_state_dict_default(self.norm2, norm2_prefix, sharded_offsets, metadata)) + return sharded_dict diff --git a/flagscale/models/megatron/engram/engram_config.py b/flagscale/models/megatron/engram/engram_config.py index 50e77e9dfe..82b15c3894 100644 --- a/flagscale/models/megatron/engram/engram_config.py +++ b/flagscale/models/megatron/engram/engram_config.py @@ -17,3 +17,6 @@ class EngramConfig(MLATransformerConfig): engram_seed: int = 0 engram_kernel_size: int = 1 engram_hc_mult: int = 1 + engram_embedding_parallel_size: int | None = 1 + engram_embedding_parallel_method: str = "alltoall" + engram_offload_embedding_optimizer_states: bool = False diff --git a/flagscale/models/megatron/engram/engram_model.py b/flagscale/models/megatron/engram/engram_model.py index b827ad5f1e..ebdcbbf541 100644 --- a/flagscale/models/megatron/engram/engram_model.py +++ b/flagscale/models/megatron/engram/engram_model.py @@ -1,5 +1,7 @@ # ruff: noqa: RUF013 ## built-in +from typing import Optional + import torch from torch import Tensor @@ -27,26 +29,41 @@ def __init__(self, hash_mapping, input_ids, hash_stream=None): self.input_ids = input_ids self.hash_stream = hash_stream self._result = None - self._computation_started = False - - # torch.cuda.nvtx.range_push("LazyHashInputIds hash") - # Start async computation immediately if stream is available + self._is_async_pending = False + # Async if self.hash_stream is not None: + # self.hash_stream.wait_stream(torch.cuda.current_stream()) with torch.cuda.stream(self.hash_stream): self._result = self.hash_mapping.hash(self.input_ids) - self._computation_started = True - # torch.cuda.nvtx.range_pop() + self._is_async_pending = True + # record result to use across stream + self._record_current_stream() - def __getitem__(self, key): - """Access hash result, synchronizing if necessary.""" + def _record_current_stream(self): + """Helper to record current stream on all result tensors""" if self._result is None: - if self.hash_stream is not None and self._computation_started: - # Wait for async computation to complete - torch.cuda.current_stream().wait_stream(self.hash_stream) - self._computation_started = False # Mark as synchronized - else: - # Compute synchronously if no stream or computation not started - self._result = self.hash_mapping.hash(self.input_ids) + return + current_stream = torch.cuda.current_stream() + if isinstance(self._result, dict): + for t in self._result.values(): + if isinstance(t, torch.Tensor): + t.record_stream(current_stream) + elif isinstance(self._result, torch.Tensor): + self._result.record_stream(current_stream) + + def __getitem__(self, key): + # Case 1: Async compute -> wait + if self._is_async_pending: + torch.cuda.current_stream().wait_stream(self.hash_stream) + self._is_async_pending = False # Async finish + self._record_current_stream() + + # Case 2: Sync but no compute -> start compute + elif self._result is None: + self._result = self.hash_mapping.hash(self.input_ids) + + # Case 3: Async or sync compute is finished. + # print(f"[rank{torch.distributed.get_rank()}]: LazyHashInputIds result = {self._result}") return self._result[key] def get(self, key, default=None): @@ -171,7 +188,40 @@ def forward( inference_context=inference_context, ) - def sharded_state_dict( - self, prefix: str = "", sharded_offsets: tuple = (), metadata: dict | None = None + def build_schedule_plan( + self, + input_ids: Tensor, + position_ids: Tensor, + attention_mask: Tensor, + decoder_input: Tensor = None, + labels: Tensor = None, + inference_context: BaseInferenceContext = None, + packed_seq_params: PackedSeqParams = None, + extra_block_kwargs: dict = None, + runtime_gather_output: Optional[bool] = None, + inference_params: Optional[BaseInferenceContext] = None, + loss_mask: Optional[Tensor] = None, ): - raise NotImplementedError("Sharded state dict is not supported for EngramModel") + """ + Adaptation of overlap_moe_expert_parallel_comm. + """ + # Precompute the engram_hash_iput_ids, it will be used to create a TransformerChunkSchedulePlan. + engram_hash_input_ids = LazyHashInputIds( + hash_mapping=self.engram_hash, + input_ids=input_ids, + hash_stream=self._hash_stream, + ) + if extra_block_kwargs is None: + extra_block_kwargs = { + "engram_hash_input_ids": engram_hash_input_ids, + } + return super().build_schedule_plan( + input_ids, + position_ids, + attention_mask, + decoder_input, + labels=labels, + loss_mask=loss_mask, + extra_block_kwargs=extra_block_kwargs + ) + diff --git a/flagscale/models/megatron/engram/engram_transformer_layer.py b/flagscale/models/megatron/engram/engram_transformer_layer.py index 9274206958..bbf08a514a 100644 --- a/flagscale/models/megatron/engram/engram_transformer_layer.py +++ b/flagscale/models/megatron/engram/engram_transformer_layer.py @@ -81,11 +81,18 @@ def forward( sequence_len_offset=sequence_len_offset, inference_params=inference_params, ) + + def pre_compute_embedding(self, hash_input_ids: Tensor): + self.engram.pre_compute_embedding(hash_input_ids) def sharded_state_dict( self, prefix: str = "", sharded_offsets: tuple = (), metadata: dict | None = None ): - raise NotImplementedError("Sharded state dict is not supported for EngramTransformerLayer") + sharded_dict = super().sharded_state_dict(prefix, sharded_offsets, metadata) + engram_prefix = f"{prefix}engram." + engram_sharded = self.engram.sharded_state_dict(engram_prefix, sharded_offsets, metadata) + sharded_dict.update(engram_sharded) + return sharded_dict class EngramTransformerBlock(TransformerBlock): @@ -283,6 +290,14 @@ def forward( # Build kwargs based on layer type layer_kwargs = {} + # Pre-compute embeddings for the next EngramTransformerLayer if exists to overlap with current layer's computation + if l_no < len(self.layers) - 1: + next_layer = self.layers[l_no + 1] + if isinstance(next_layer, EngramTransformerLayer): + engram_hash_layer_id = next_layer.layer_number - 1 + hash_input_ids = engram_hash_input_ids[engram_hash_layer_id] + next_layer.pre_compute_embedding(hash_input_ids) + # Only pass input_ids to EngramTransformerLayer if isinstance(layer, EngramTransformerLayer): layer_kwargs["input_ids"] = input_ids @@ -333,8 +348,3 @@ def forward( hidden_states = hidden_states.clone() return hidden_states - - def sharded_state_dict( - self, prefix: str = "", sharded_offsets: tuple = (), metadata: dict = None - ): - raise NotImplementedError("Sharded state dict is not supported for EngramTransformerBlock") diff --git a/flagscale/models/megatron/engram/multi_head_embedding.py b/flagscale/models/megatron/engram/multi_head_embedding.py index fd999d6d13..3be3cbdd41 100644 --- a/flagscale/models/megatron/engram/multi_head_embedding.py +++ b/flagscale/models/megatron/engram/multi_head_embedding.py @@ -1,14 +1,21 @@ ## built-in - +from typing import Optional, Callable, Tuple ## third-party import math import torch import torch.nn as nn +import torch.nn.functional as F +from torch.nn.parameter import Parameter # megatron-core from megatron.core import tensor_parallel -from megatron.core.utils import get_pg_size, get_tensor_model_parallel_group_if_none +from megatron.core.utils import get_pg_size, get_pg_rank, get_tensor_model_parallel_group_if_none +from megatron.core.tensor_parallel.utils import VocabUtility +from megatron.core.tensor_parallel.layers import _initialize_affine_weight_cpu, _initialize_affine_weight_gpu +from megatron.core import parallel_state +from megatron.core.model_parallel_config import ModelParallelConfig +from megatron.core.dist_checkpointing.mapping import ShardedTensor # engram from .engram_config import EngramConfig @@ -24,6 +31,218 @@ def _vocab_size_with_padding(orig_vocab_size, tp_size): return after +class EngramMemory(nn.Module): + """Embedding parallelized in the vocabulay dimension. + + This is mainly adapted from torch.nn.Embedding and all the default values are kept. + + Unlike to the MCore VocabParallelEmbedding, the embedding parallel use parallelism like expert parallel. + The parallel group is the subset of data parallel, which is given as the engram_model_parallel_size. + Input of each rank is different, when forwarding, the input will be transmit to other rank using an All2All operator. + + TODO: The All2All version is experimental, we just use the expert_model_parallel_group for the performance tuning. + + Args: + num_embeddings: vocabulary size. + embedding_dim: size of hidden state. + + Keyword Args: + init_method: A Callable. + config: A EngramConfig object. + embedding_parallel_group: vocab parallel group, a torch.distributed.ProcessGroup object. + """ + + def __init__( + self, + num_embeddings: int, + embedding_dim: int, + *, + init_method: Callable, + reduce_scatter_embeddings: bool = False, + config: ModelParallelConfig, + embedding_parallel_group: Optional[torch.distributed.ProcessGroup] = None, + ): + super().__init__() + # Keep the input dimensions. + self.num_embeddings = num_embeddings + self.embedding_dim = embedding_dim + self.reduce_scatter_embeddings = reduce_scatter_embeddings + self.embedding_parallel_group = embedding_parallel_group + if self.embedding_parallel_group is None: + self.embedding_parallel_size = 1 + self.embedding_parallel_rank = 0 + else: + self.embedding_parallel_size = get_pg_size(self.embedding_parallel_group) + self.embedding_parallel_rank = get_pg_rank(self.embedding_parallel_group) + + (self.vocab_start_index, self.vocab_end_index) = ( + VocabUtility.vocab_range_from_global_vocab_size( + self.num_embeddings, self.embedding_parallel_rank, self.embedding_parallel_size + ) + ) + self.num_embeddings_per_partition = self.vocab_end_index - self.vocab_start_index + self.deterministic_mode = config.deterministic_mode + + # Allocate weights and initialize. + if config.use_cpu_initialization: + self.weight = Parameter( + torch.empty( + self.num_embeddings_per_partition, self.embedding_dim, dtype=config.params_dtype + ) + ) + if config.perform_initialization: + _initialize_affine_weight_cpu( + self.weight, + self.num_embeddings, + self.embedding_dim, + self.num_embeddings_per_partition, + 0, + init_method, + params_dtype=config.params_dtype, + rank=self.embedding_parallel_rank, + world_size=self.embedding_parallel_size, + ) + else: + self.weight = Parameter( + torch.empty( + self.num_embeddings_per_partition, + self.embedding_dim, + device=torch.cuda.current_device(), + dtype=config.params_dtype, + ) + ) + if config.perform_initialization: + _initialize_affine_weight_gpu(self.weight, init_method, partition_dim=0, stride=1) + + def enable_parallel(self): + if self.embedding_parallel_size > 1: + setattr(self.weight, "is_engram_embedding", True) + setattr(self.weight, "allreduce", False) + + def enable_offloading(self): + setattr(self.weight, "is_offloading_candidate", True) + + def _dispatch(self, input_ids): + torch.cuda.nvtx.range_push("engram_embedding_dispatch") + self.hidden_shape = input_ids.shape + input_ids = input_ids.view(-1) + routing_map = input_ids // self.num_embeddings_per_partition + # [num_partitions], number of tokens assigned to each partition from the current rank's input. + num_tokens_per_partition = torch.bincount( + routing_map, + minlength=self.embedding_parallel_size, + ).to(dtype=torch.int64) + # Reorder the token indices to match the order of partitions. + # Shape = (batch * seqlen, ). + token_indices_partitions_sorted = torch.argsort(routing_map, stable=True) + # Shape = (batch * seqlen, ). + routed_input = input_ids[token_indices_partitions_sorted] + # Use to unsort. + self._token_unsort_indices = torch.empty_like(token_indices_partitions_sorted) + self._token_unsort_indices[token_indices_partitions_sorted] = torch.arange( + token_indices_partitions_sorted.size(0), device=token_indices_partitions_sorted.device + ) + # generate the input splits and output splits for all-to-all + with torch.no_grad(): + output_splits_cuda = tensor_parallel.all_to_all( + self.embedding_parallel_group, + num_tokens_per_partition, + None, + None, + ) + # Need to wait explicitly because it is used by a triton kernel later + # which doesn't realize that AsyncCollectiveTensor needs unwrapping + output_splits_cuda = torch.ops._c10d_functional.wait_tensor( + output_splits_cuda + ) + input_splits = num_tokens_per_partition.view(self.embedding_parallel_size, -1).sum(dim=1).to(torch.device("cpu"), non_blocking=True) + # NOTE: this would incur a device-to-host sync + output_splits = ( + output_splits_cuda.view(self.embedding_parallel_size, -1) + .sum(dim=1) + .to(torch.device("cpu"), non_blocking=False) + ) + self.input_splits = input_splits.tolist() + self.output_splits = output_splits.tolist() + + # perform all-to-all + routed_input = tensor_parallel.all_to_all( + self.embedding_parallel_group, routed_input, self.output_splits, self.input_splits + ) + routed_input = routed_input - self.vocab_start_index + torch.cuda.nvtx.range_pop() + return routed_input + + def _combine(self, hidden_states: torch.Tensor): + torch.cuda.nvtx.range_push("engram_embedding_combine") + routed_hidden_states = tensor_parallel.all_to_all(self.embedding_parallel_group, hidden_states, self.input_splits, self.output_splits) + routed_hidden_states = routed_hidden_states[self._token_unsort_indices] + hidden_states = routed_hidden_states.view(*self.hidden_shape, -1) + torch.cuda.nvtx.range_pop() + return hidden_states + + def forward(self, input_: torch.Tensor): + """Forward. + + Args: + input_ (torch.Tensor): Input tensor, shape (b, s), dtype = torch.int64. + """ + torch.cuda.nvtx.range_push("engram_embedding_forward") + if self.reduce_scatter_embeddings: + tp_size = parallel_state.get_tensor_model_parallel_world_size() + tp_rank = parallel_state.get_tensor_model_parallel_rank() + num_tokens_per_sp_rank = input_.shape[1] // tp_size + if tp_rank < tp_size - 1: + input_ = input_[:, tp_rank * num_tokens_per_sp_rank : (tp_rank + 1) * num_tokens_per_sp_rank] + else: + input_ = input_[:, tp_rank * num_tokens_per_sp_rank : ] + input_ = input_.contiguous() + if self.embedding_parallel_size > 1: + input_ = self._dispatch(input_) + # Get the embeddings. + if self.deterministic_mode: + output = self.weight[input_] + else: + # F.embedding currently has a non-deterministic backward function + output = F.embedding(input_, self.weight) + # Get the complete output embedding + if self.embedding_parallel_size > 1: + output = self._combine(output) + if self.reduce_scatter_embeddings: + output = output.transpose(0, 1).contiguous() + torch.cuda.nvtx.range_pop() + return output + + def sharded_state_dict( + self, + prefix: str = '', + sharded_offsets: Tuple[Tuple[int, int, int]] = (), + metadata: Optional[dict] = None,** kwargs, + ): + state_dict = self.state_dict(prefix="", keep_vars=True) + weight_prefix = f"{prefix}weight" + prepend_axis_num = len(sharded_offsets) + new_offsets = [] + tp_rank = self.embedding_parallel_rank + tp_size = self.embedding_parallel_size + dp_replica_id = get_pg_rank(parallel_state.get_engram_data_parallel_group()) + new_offsets.append((prepend_axis_num, tp_rank, tp_size)) + + replica_id = (0, 0, dp_replica_id) + sharded_tensor = ShardedTensor.from_rank_offsets( + weight_prefix, + state_dict["weight"], + *sharded_offsets, + *new_offsets, + replica_id=replica_id, + allow_shape_mismatch=True, + **kwargs + ) + return { + weight_prefix: sharded_tensor + } + + class MultiHeadEmbedding(nn.Module): def __init__(self, engram_cfg: EngramConfig, list_of_N: list[int], D: int): super().__init__() @@ -40,25 +259,52 @@ def __init__(self, engram_cfg: EngramConfig, list_of_N: list[int], D: int): total_N = sum(list_of_N) # embeddings (parallel). - self.tp_group = get_tensor_model_parallel_group_if_none(tp_group=None) - self.reduce_scatter_embeddings = self.engram_cfg.sequence_parallel - - padded_total_N = _vocab_size_with_padding(total_N, get_pg_size(self.tp_group)) - print(f"Engram multi-head embedding: pad total_n from {total_N} to {padded_total_N}") - - self.embedding = tensor_parallel.VocabParallelEmbedding( - num_embeddings=padded_total_N, - embedding_dim=D, - init_method=self.engram_cfg.embedding_init_method, - reduce_scatter_embeddings=self.reduce_scatter_embeddings, - config=self.engram_cfg, - tp_group=self.tp_group, - ) + if self.engram_cfg.engram_embedding_parallel_method == "allreduce": + self.tp_group = get_tensor_model_parallel_group_if_none(tp_group=None) + self.reduce_scatter_embeddings = self.engram_cfg.sequence_parallel + + padded_total_N = _vocab_size_with_padding(total_N, get_pg_size(self.tp_group)) + print(f"Engram multi-head embedding: pad total_n from {total_N} to {padded_total_N}") + + self.memory = tensor_parallel.VocabParallelEmbedding( + num_embeddings=padded_total_N, + embedding_dim=D, + init_method=self.engram_cfg.embedding_init_method, + reduce_scatter_embeddings=self.reduce_scatter_embeddings, + config=self.engram_cfg, + tp_group=self.tp_group, + ) + else: + self.embedding_parallel_group = parallel_state.get_engram_embedding_parallel_group() + self.reduce_scatter_embeddings = self.engram_cfg.sequence_parallel + padded_total_N = _vocab_size_with_padding(total_N, get_pg_size(self.embedding_parallel_group)) + print(f"Engram multi-head embedding: pad total_n from {total_N} to {padded_total_N}") + self.memory = EngramMemory( + num_embeddings=padded_total_N, + embedding_dim=D, + init_method=self.engram_cfg.embedding_init_method, + reduce_scatter_embeddings=self.reduce_scatter_embeddings, + config=self.engram_cfg, + embedding_parallel_group=self.embedding_parallel_group, + ) + if self.engram_cfg.engram_embedding_parallel_method == "alltoall": + self.memory.enable_parallel() + if self.engram_cfg.engram_offload_embedding_optimizer_states: + self.memory.enable_offloading() + else: + raise ValueError(f"Unsupported engram_embedding_parallel_method: {self.engram_cfg.engram_embedding_parallel_method}") def forward(self, input_ids: torch.Tensor) -> torch.Tensor: shifted_input_ids = input_ids + self.offsets - output = self.embedding(shifted_input_ids) + output = self.memory(shifted_input_ids) if not self.reduce_scatter_embeddings: output = output.transpose(0, 1).contiguous() return output + + def sharded_state_dict(self, prefix: str = "", sharded_offsets: tuple = (), metadata: dict | None = None): + sharded_dict = {} + memory_prefix = f"{prefix}memory." + memory_sharded_dict = self.memory.sharded_state_dict(memory_prefix, sharded_offsets, metadata) + sharded_dict.update(memory_sharded_dict) + return sharded_dict diff --git a/flagscale/train/megatron/train_engram.py b/flagscale/train/megatron/train_engram.py index 5e1bff6340..2129a439b5 100644 --- a/flagscale/train/megatron/train_engram.py +++ b/flagscale/train/megatron/train_engram.py @@ -15,10 +15,10 @@ from megatron.core.rerun_state_machine import get_rerun_state_machine from megatron.core.utils import get_attr_wrapped_model, StragglerDetector from megatron.core.tokenizers.text.utils.build_tokenizer import build_tokenizer -from megatron.core import mpu from megatron.training import get_args, get_timers, get_tokenizer, print_rank_0 from megatron.training.utils import ( get_batch_on_this_cp_rank, + get_batch_on_this_tp_rank, get_blend_and_blend_per_split, is_first_or_last_pipeline_stage, ) @@ -43,143 +43,6 @@ stimer = StragglerDetector() -def get_batch_on_this_tp_rank(data_iterator): - - args = get_args() - - def _broadcast(item): - if item is not None: - torch.distributed.broadcast( - item, - mpu.get_tensor_model_parallel_src_rank(), - group=mpu.get_tensor_model_parallel_group(), - ) - - if mpu.get_tensor_model_parallel_rank() == 0: - - assert data_iterator is not None - data = next(data_iterator) - batch = { - 'tokens': data["tokens"].cuda(non_blocking=True), - 'labels': data["labels"].cuda(non_blocking=True), - 'loss_mask': data["loss_mask"].cuda(non_blocking=True), - 'attention_mask': ( - None - if "attention_mask" not in data - else data["attention_mask"].cuda(non_blocking=True) - ), - 'position_ids': data["position_ids"].cuda(non_blocking=True), - } - - if args.pipeline_model_parallel_size == 1: - _broadcast(batch['tokens']) - _broadcast(batch['labels']) - _broadcast(batch['loss_mask']) - _broadcast(batch['attention_mask']) - _broadcast(batch['position_ids']) - - elif mpu.is_pipeline_first_stage(): - _broadcast(batch['tokens']) - _broadcast(batch['attention_mask']) - _broadcast(batch['position_ids']) - ######### FlagScale Begin ######## - if mpu.get_dualpipev_pipeline_model_parallel_world_size() is not None: - _broadcast(batch['loss_mask']) - _broadcast(batch['labels']) - ######### FlagScale End ######## - - elif mpu.is_pipeline_last_stage(): - # Multi-Token Prediction (MTP) layers need tokens and position_ids to calculate embedding. - # Currently the Multi-Token Prediction (MTP) layers is fixed on the last stage, so we need - # to broadcast tokens and position_ids to all of the tensor parallel ranks on the last stage. - _broadcast(batch['tokens']) - if args.mtp_num_layers is not None: - _broadcast(batch['position_ids']) - _broadcast(batch['labels']) - _broadcast(batch['loss_mask']) - _broadcast(batch['attention_mask']) - - else: - _broadcast(batch['tokens']) - - else: - - tokens = torch.empty( - (args.micro_batch_size, args.seq_length), - dtype=torch.int64, - device=torch.cuda.current_device(), - ) - labels = torch.empty( - (args.micro_batch_size, args.seq_length), - dtype=torch.int64, - device=torch.cuda.current_device(), - ) - loss_mask = torch.empty( - (args.micro_batch_size, args.seq_length), - dtype=torch.float32, - device=torch.cuda.current_device(), - ) - if args.create_attention_mask_in_dataloader: - attention_mask = torch.empty( - (args.micro_batch_size, 1, args.seq_length, args.seq_length), - dtype=torch.bool, - device=torch.cuda.current_device(), - ) - else: - attention_mask = None - position_ids = torch.empty( - (args.micro_batch_size, args.seq_length), - dtype=torch.int64, - device=torch.cuda.current_device(), - ) - - if args.pipeline_model_parallel_size == 1: - _broadcast(tokens) - _broadcast(labels) - _broadcast(loss_mask) - _broadcast(attention_mask) - _broadcast(position_ids) - - elif mpu.is_pipeline_first_stage(): - _broadcast(tokens) - _broadcast(attention_mask) - _broadcast(position_ids) - ######### FlagScale Modify ######## - if mpu.get_dualpipev_pipeline_model_parallel_world_size() is not None: - _broadcast(loss_mask) - _broadcast(labels) - else: - labels = None - loss_mask = None - - elif mpu.is_pipeline_last_stage(): - # Multi-Token Prediction (MTP) layers need tokens and position_ids to calculate embedding. - # Currently the Multi-Token Prediction (MTP) layers is fixed on the last stage, so we need - # to broadcast tokens and position_ids to all of the tensor parallel ranks on the last stage. - _broadcast(tokens) - if args.mtp_num_layers is not None: - _broadcast(position_ids) - else: - position_ids = None - - _broadcast(labels) - _broadcast(loss_mask) - _broadcast(attention_mask) - - else: - _broadcast(tokens) - - batch = { - 'tokens': tokens, - 'labels': labels, - 'loss_mask': loss_mask, - 'attention_mask': attention_mask, - 'position_ids': position_ids, - } - - return batch - - def get_batch(data_iterator, vp_stage=None): """Generate a batch.""" # # TODO: this is pretty hacky, find a better way @@ -188,8 +51,31 @@ def get_batch(data_iterator, vp_stage=None): # get batches based on the TP rank you are on batch = get_batch_on_this_tp_rank(data_iterator) - + ## NOTE: This should be handled in the get_batch_on_this_tp_rank. + # In order to minimize the impact of the engram on the framework's internal structure, it has been extracted + # from the function. Better solutions will be found in the future. + # Broadcast tokens within TP group for each pipeline_stage except the first stage. + # Already broadcast tokens in above function: + # 1. pp_size = 1 + # 2. is_pp_first_stage + # 3. is_pp_last_stage and enbale_mtp + args = get_args() + already_broadcast_tokens = (parallel_state.get_pipeline_model_parallel_world_size() == 1) or parallel_state.is_pipeline_first_stage() or (parallel_state.is_pipeline_last_stage() and args.mtp_num_layers is not None) + if not already_broadcast_tokens: + if parallel_state.get_tensor_model_parallel_rank() == 0: + torch.distributed.broadcast(batch["tokens"], src=parallel_state.get_tensor_model_parallel_src_rank(), group=parallel_state.get_tensor_model_parallel_group()) + else: + # Allocate a placeholder tensor to receive the broadcasted tokens. + tokens = torch.empty( + (args.micro_batch_size, args.seq_length), + dtype=torch.long, + device=torch.cuda.current_device(), + ) + torch.distributed.broadcast(tokens, src=parallel_state.get_tensor_model_parallel_src_rank(), group=parallel_state.get_tensor_model_parallel_group()) + batch["tokens"] = tokens # slice batch along sequence dimension for context parallelism + # I am not sure why broadcast here needs a synchronize but does not need in the get_batch_on_this_tp_rank. Anyway, make it happy. + torch.cuda.synchronize() batch = get_batch_on_this_cp_rank(batch) return batch.values() diff --git a/flagscale/train/megatron/training/arguments_fs.py b/flagscale/train/megatron/training/arguments_fs.py index 1bf27fc328..c7d707e6f2 100644 --- a/flagscale/train/megatron/training/arguments_fs.py +++ b/flagscale/train/megatron/training/arguments_fs.py @@ -371,71 +371,34 @@ def _parse_recompute_refined_config(recom_config, recom_config_name): assert args.recompute_method is None and args.recompute_granularity is None and args.recompute_num_layers is None, "PEFT will raise comfilcts with recompute currently" assert args.ckpt_format == 'torch', "PEFT is only tested with torch format checkpoint" - # DualPipeV related - if args.use_dualpipev: - assert args.pipeline_model_parallel_size > 1, ( - "DualPipeV can only be used for pipeline scheduling in MoE models, " - "thus requiring both pipeline parallelism and expert parallelism." - ) - assert args.expert_model_parallel_size > 1, ( - "DualPipeV can only be used for pipeline scheduling in MoE models, " - "thus requiring both pipeline parallelism and expert parallelism." - ) - - middle_stage_layers = args.num_layers - num_middle_stages = args.pipeline_model_parallel_size - if args.decoder_first_pipeline_num_layers is not None: - middle_stage_layers = middle_stage_layers - args.decoder_first_pipeline_num_layers - num_middle_stages = num_middle_stages - 1 - assert args.decoder_first_pipeline_num_layers % 2 == 0, ( - "The first pipeline stage must contain an even number of Transformer layers, " - "so that DualPipeV can split it into two model chunks." - ) - if args.decoder_last_pipeline_num_layers is not None: - middle_stage_layers = middle_stage_layers - args.decoder_last_pipeline_num_layers - num_middle_stages = num_middle_stages - 1 - assert args.decoder_last_pipeline_num_layers % 2 == 0, ( - "The last pipeline stage must contain an even number of Transformer layers, " - "so that DualPipeV can split it into two model chunks." - ) - if num_middle_stages > 0: - assert middle_stage_layers > 0, "Layers can not be empty" - assert middle_stage_layers % num_middle_stages == 0, "Layers must be even split" - num_layers_in_middle_stages = middle_stage_layers // num_middle_stages - assert num_layers_in_middle_stages % 2 == 0, ( - "The middle pipeline stage must contain an even number of Transformer layers, " - "so that DualPipeV can split it into two model chunks." - ) - - assert args.moe_shared_expert_overlap is False, ( - " DualPipeV does not support simultaneous use with moe_shared_expert_overlap currently." - ) - - if args.moe_fb_overlap: - assert args.overlap_grad_reduce is False and args.overlap_param_gather is False, ( - " DualPipeV configured with moe_fb_overlap is incompatible with either overlap_grad_reduce or overlap_param_gather. " - " When moe_fb_overlap is enabled, DualPipeV activates the DW-split mechanism provided by Transformer Engine, " - " which causes all param.grad attributes to be None during the backward-for-inputs phase. " - " This absence of gradient tensors violates the assumptions of both overlap_grad_reduce and overlap_param_gather, precipitating an assertion failure within DDP." - ) - assert not args.moe_use_legacy_grouped_gemm, \ - 'delay_wgrad_compute is not supported with legacy groupedgemm implementation' - assert args.transformer_impl == 'transformer_engine', \ - 'delay_wgrad_compute is only supported with transformer_engine implementation' - - assert args.untie_embeddings_and_output_weights is True, ( - " DualPipeV is not supported with shared embedding and lm head" - ) - assert args.mtp_num_layers is None, ( - "DualPipeV is not supported with multi-token-predictor currently" - ) - - if args.peft_type is not None: - assert args.transformer_impl == 'transformer_engine', \ - 'PEFT is only supported with transformer_engine implementation' - assert args.num_experts is None, "PEFT is not tested with MoE currently" - assert args.recompute_method is None and args.recompute_granularity is None and args.recompute_num_layers is None, "PEFT will raise comfilcts with recompute currently" - assert args.ckpt_format == 'torch', "PEFT is only tested with torch format checkpoint" + # Engram related. + if self.args.use_engram: + if self.args.engram_embedding_parallel_method == "allreduce": + if self.args.rank == 0: + warnings.warn(f"[rank0]: We do not recommend using allreduce for engram embedding, this is deprecated and will be removed in a later version.", DeprecationWarning) + if self.args.engram_embedding_parallel_size is not None: + warnings.warn( + "[rank0]: If set the embedding_parallel_method to allreduce, " \ + "the embedding module will be the tensor_parallel.layers.VocabParallelEmbedding with tensor_parallel." \ + "So the embedding_parallel_size is useless and set to None." + ) + self.args.engram_embedding_parallel_size = None + elif self.args.engram_embedding_parallel_method == "alltoall": + assert self.args.engram_embedding_parallel_size is not None, "embedding parallel size should be specified when using alltoall" + assert self.args.engram_embedding_parallel_size >= self.args.tensor_model_parallel_size, f"Engram parallel size {self.args.engram_embedding_parallel_size} should be greater than or equal to tensor_model_parallel_size {self.args.tensor_model_parallel_size}, otherwise the random seed may be different in engram_dp_ranks. It will be fixed in a later version." + else: + raise ValueError(f"Invalid embedding parallel method: {self.args.engram_embedding_parallel_method}") + if self.args.engram_offload_embedding_optimizer_states: + assert self.args.engram_embedding_parallel_method == "alltoall", f"Offloading embedding optimizer states is only supported when using alltoall for engram embedding parallelism, now is {self.args.engram_embedding_parallel_method}." + assert self.args.optimizer_cpu_offload, "Offloading embedding optimizer states requires optimizer_cpu_offload to be enabled." + warnings.warn("Offloading embedding optimizer states will offload all embedding optimizer states to CPU, which may cause slowdown. " \ + "Please make sure this is what you want. This is typically used to save GPU memory when Engram embedding is large while accelerators are limited." \ + "If you do not want to offload all embedding optimizer states to CPU, please disable this and set the --optimizer-offload-fraction to a value less than 1 to offload part of the optimizer states to CPU." \ + "Of cource you can set the --optimizer-offload-fraction to offload other params meanwhile enable this to offload all embedding optimizer states to CPU.") + assert not self.args.use_megatron_fsdp, "Megatron FSDP is not supported yet; support is planned for a later version." + assert not self.args.init_model_with_meta_device, "Init_model_with_meta_device is not supported yet; support is planned for a later version." + assert self.args.use_distributed_optimizer, "When use engram, distributed_optimizer must be enabled, because there is a bug caused by allreduce grad norm in model parallel group when do not use distributed_optimizer. We have not found a pretty solution yet, so disable it temporarily." + assert not (args.pipeline_model_parallel_size == 1 and args.overlap_moe_expert_parallel_comm), "When no pipeline and enable overlap_moe_expert_parallel_comm, a bug will occur, it will be fixed in a later version." def _add_hetero_args(parser): @@ -853,6 +816,25 @@ def _add_engram_args(parser): default=1, help='Hyper-connection multiplicity for Engram', ) + group.add_argument( + '--engram-embedding-parallel-size', + type=int, + default=1, + help='Parallel size for Engram embedding', + ) + group.add_argument( + '--engram-embedding-parallel-method', + type=str, + default="alltoall", + choices=["alltoall", "allreduce"], + help='Parallel method for Engram embedding across embedding parallel(alltoall) / tensor parallel(allreduce) groups', + ) + group.add_argument( + "--engram-offload-embedding-optimizer-states", + action="store_true", + help="Whether to offload Engram embedding optimizer states to CPU when using alltoall for Engram embedding parallelism. " \ + "This is typically used to save GPU memory when Engram embedding is large while accelerators are limited." + ) return parser diff --git a/flagscale/train/megatron/training/initialize.py b/flagscale/train/megatron/training/initialize.py index 607fc71f90..cb01e821d4 100644 --- a/flagscale/train/megatron/training/initialize.py +++ b/flagscale/train/megatron/training/initialize.py @@ -424,6 +424,7 @@ def _initialize_distributed(get_embedding_ranks, get_position_embedding_ranks, s expert_model_parallel_size=args.expert_model_parallel_size, num_distributed_optimizer_instances=args.num_distributed_optimizer_instances, expert_tensor_parallel_size=args.expert_tensor_parallel_size, + engram_embedding_parallel_size=args.engram_embedding_parallel_size, distributed_timeout_minutes=args.distributed_timeout_minutes, nccl_communicator_config_path=args.nccl_communicator_config_path, order='tp-cp-ep-dp-pp' if not args.use_tp_pp_dp_mapping else 'tp-cp-ep-pp-dp', diff --git a/flagscale/train/megatron/training/utils.py b/flagscale/train/megatron/training/utils.py index 8bc80674ec..762a568bec 100644 --- a/flagscale/train/megatron/training/utils.py +++ b/flagscale/train/megatron/training/utils.py @@ -77,6 +77,7 @@ def calc_params_l2_norm(model, force_create_fp32_copy=False): # Seperate moe and dense params params_data = [] moe_params_data = [] + engram_embedding_data = [] sharded_params_data = [] data_parallel_group = None @@ -88,10 +89,16 @@ def calc_params_l2_norm(model, force_create_fp32_copy=False): continue assert is_not_tp_duplicate if not getattr(param, 'allreduce', True): - # TODO: Implement memory optimization for MoE parameters. - assert param_is_not_shared(param) - param = to_local_if_dtensor(param) - moe_params_data.append(param.data.float() if args.bf16 else param.data) + if not getattr(param, "is_engram_embedding", False): + # TODO: Implement memory optimization for MoE parameters. + assert param_is_not_shared(param) + param = to_local_if_dtensor(param) + moe_params_data.append(param.data.float() if args.bf16 else param.data) + else: + # Engram embedding param + assert param_is_not_shared(param) + param = to_local_if_dtensor(param) + engram_embedding_data.append(param.data.float() if args.bf16 else param.data) else: if param_is_not_shared(param): param = to_local_if_dtensor(param) @@ -162,6 +169,18 @@ def calc_params_l2_norm(model, force_create_fp32_copy=False): # See details in https://gitlab-master.nvidia.com/ADLR/megatron-lm/-/issues/409 else: moe_norm_2 = torch.zeros_like(norm_2) + + # Add norm contribution from engram embedding. + if len(engram_embedding_data) > 0: + engram_embedding_norm, _ = multi_tensor_applier( + multi_tensor_l2norm, + dummy_overflow_buf, + [engram_embedding_data], + False, # no per-parameter norm. + ) + engram_embedding_norm_2 = engram_embedding_norm * engram_embedding_norm + else: + engram_embedding_norm_2 = torch.zeros_like(norm_2) ########## FlagScale Begin ########## # Sum across all model-parallel GPUs(tensor + pipeline). @@ -192,6 +211,7 @@ def calc_params_l2_norm(model, force_create_fp32_copy=False): moe_norm_2, op=torch.distributed.ReduceOp.SUM, group=emp_group ) norm_2 += moe_norm_2 + assert len(engram_embedding_data) <= 0, "Engram embedding does not support hetero." ########## FlagScale End ########## else: # original code @@ -202,6 +222,8 @@ def calc_params_l2_norm(model, force_create_fp32_copy=False): # Expert params should sum across all model-parallel GPUs (expert + tensor + pipeline). expert_reduce_group = mpu.get_expert_tensor_model_pipeline_parallel_group() ranks_in_expert_reduce_group = torch.distributed.get_process_group_ranks(expert_reduce_group) + # Engram params should sum across engram-embed-parallel GPUs.(Engram embedding and pipeline, for which has no engram module, the engram_module_initialized is False, and the param_norm is set to 0.) + engram_mp_group = mpu.get_engram_model_parallel_group() # If dense and expert reduce groups are the same, sum then reduce. if ranks_in_dense_reduce_group == ranks_in_expert_reduce_group: @@ -218,10 +240,18 @@ def calc_params_l2_norm(model, force_create_fp32_copy=False): moe_norm_2, op=torch.distributed.ReduceOp.SUM, group=expert_reduce_group ) norm_2 += moe_norm_2 + # Reduce and add engram embedding norm if the group exists. + # Because engram_mp_group is different with other two groups in most cases, in order to reduce the impact of original code, allreduce and add independently here even if the engram embedding parallel group is the same as dense or expert group. + if engram_mp_group is not None: + torch.distributed.all_reduce( + engram_embedding_norm_2, op=torch.distributed.ReduceOp.SUM, group=engram_mp_group + ) + norm_2 += engram_embedding_norm_2 if comm_device == "cpu": norm_2 = norm_2.to(cur_platform.device()) moe_norm_2 = moe_norm_2.to(cur_platform.device()) + engram_embedding_norm_2 = engram_embedding_norm_2.to(cur_platform.device()) return norm_2.item() ** 0.5 diff --git a/tests/functional_tests/train/deepseek/gold_values/tp2_pp2_ep2_engram.json b/tests/functional_tests/train/deepseek/gold_values/tp2_pp2_ep2_engram.json index 2830ce8a2f..0ea14d2ea9 100644 --- a/tests/functional_tests/train/deepseek/gold_values/tp2_pp2_ep2_engram.json +++ b/tests/functional_tests/train/deepseek/gold_values/tp2_pp2_ep2_engram.json @@ -1 +1,16 @@ -{"lm loss:": {"values": [12.31449, 12.31611, 20.91898, 12.73568, 13.30583, 11.24752, 9.966835, 11.10967, 10.74055, 10.14268]}} +{ + "lm loss:": { + "values": [ + 12.31449, + 12.31611, + 20.91898, + 12.73568, + 13.30583, + 11.24752, + 9.966835, + 11.10967, + 10.74055, + 10.14268 + ] + } +}