Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def __init__(
):
self.model = ""
self.is_quantized = False
self.is_moe_quantized = False
self.max_model_len = 0
self.dtype = ""
self.enable_logprob = False
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/engine/args_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,7 @@ def create_eplb_config(self) -> EPLBConfig:
if self.eplb_config is not None:
for k, v in self.eplb_config.items():
eplb_args[k] = v
eplb_args["enable_eplb"] = self.enable_eplb
return EPLBConfig(eplb_args)

def create_engine_config(self, port_availability_check: bool = True) -> FDConfig:
Expand Down
50 changes: 24 additions & 26 deletions fastdeploy/entrypoints/engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def __init__(
else:
self.is_master = False

if self.config.eplb_config.enable_eplb and self.config.parallel_config.expert_parallel_rank == 0:
if self.config.eplb_config.enable_eplb:
self.init_eplb_signals(ipc_signal_suffix=port)

array_size = min(max_chips_per_node, tensor_parallel_size)
Expand Down Expand Up @@ -126,17 +126,21 @@ def init_eplb_signals(self, ipc_signal_suffix):
"""
Initialize eplb signals.
"""
if self.config.parallel_config.tensor_parallel_rank != 0:
# only TP rank 0 need to init eplb signals, rank 0 manage all EPLB signals for all TP ranks
return
self.signal_clear_experts_token_stats_list = []
self.local_experts_token_stats_array_list = []
self.expert_tokens_stats_array_list = []
self.signal_update_weight_from_disk_array_list = []
self.update_weight_from_disk_result_list = []
dp_ipc_signal_suffix = f"{ipc_signal_suffix}_dp{self.config.parallel_config.local_data_parallel_id}"
rearrange_experts_status = np.zeros([1], dtype=np.int32)
self.rearrange_experts_signal = IPCSignal(
name="rearrange_experts_status",
array=rearrange_experts_status,
dtype=np.int32,
suffix=ipc_signal_suffix,
suffix=dp_ipc_signal_suffix,
create=False,
)

Expand All @@ -145,14 +149,14 @@ def init_eplb_signals(self, ipc_signal_suffix):
name="rearrange_experts_ips_size",
array=rearrange_experts_ips_size_array,
dtype=np.int32,
suffix=ipc_signal_suffix,
suffix=dp_ipc_signal_suffix,
create=False,
)

self.shm_rearrange_experts_ips_list = IPCSignal(
name="rearrange_experts_ips_list",
shm_size=self.config.eplb_config.redundant_expert_ip_shm_size,
suffix=ipc_signal_suffix,
suffix=dp_ipc_signal_suffix,
create=False,
)

Expand All @@ -161,27 +165,19 @@ def init_eplb_signals(self, ipc_signal_suffix):
name="signal_update_weight_from_tensor",
array=signal_update_weight_from_tensor,
dtype=np.int32,
suffix=ipc_signal_suffix,
suffix=dp_ipc_signal_suffix,
create=False,
)

if envs.FD_ENABLE_MULTI_API_SERVER:
engine_worker_suffix = [
self.config.parallel_config.engine_worker_queue_port[
self.config.parallel_config.local_data_parallel_id
]
]
else:
engine_worker_suffix = self.config.parallel_config.engine_worker_queue_port

for suffix_port in engine_worker_suffix:
for tp_rank_id in range(self.config.parallel_config.tensor_parallel_size):
tp_ipc_signal_suffix = f"{dp_ipc_signal_suffix}_tp{tp_rank_id}"
signal_clear_experts_token_stats = np.zeros([1], dtype=np.int32)
self.signal_clear_experts_token_stats_list.append(
IPCSignal(
name="signal_clear_experts_token_stats",
array=signal_clear_experts_token_stats,
dtype=np.int32,
suffix=suffix_port,
suffix=tp_ipc_signal_suffix,
create=False,
)
)
Expand All @@ -192,7 +188,7 @@ def init_eplb_signals(self, ipc_signal_suffix):
name="signal_update_weight_from_disk",
array=signal_update_weight_from_disk,
dtype=np.int32,
suffix=suffix_port,
suffix=tp_ipc_signal_suffix,
create=False,
)
)
Expand All @@ -203,7 +199,7 @@ def init_eplb_signals(self, ipc_signal_suffix):
name="result_update_weight_from_disk",
array=result_update_weight_from_disk,
dtype=np.int32,
suffix=suffix_port,
suffix=tp_ipc_signal_suffix,
create=False,
)
)
Expand All @@ -217,7 +213,7 @@ def init_eplb_signals(self, ipc_signal_suffix):
name="all_experts_token_stats",
array=experts_token_stats,
dtype=np.int32,
suffix=suffix_port,
suffix=tp_ipc_signal_suffix,
create=False,
)
)
Expand All @@ -226,7 +222,7 @@ def init_eplb_signals(self, ipc_signal_suffix):
name="local_experts_token_stats",
array=experts_token_stats,
dtype=np.int32,
suffix=suffix_port,
suffix=tp_ipc_signal_suffix,
create=False,
)
)
Expand Down Expand Up @@ -541,10 +537,10 @@ async def rearrange_experts(self, request_dict: dict):
status_code = HTTPStatus.UNAUTHORIZED
return content, status_code

if self.config.parallel_config.expert_parallel_rank != 0:
if self.config.parallel_config.tensor_parallel_rank != 0:
content = {
"code": 1,
"msg": f"actual rank {self.config.parallel_config.expert_parallel_rank}, expect rank 0",
"msg": f"actual rank {self.config.parallel_config.tensor_parallel_rank}, expect rank 0",
}
status_code = HTTPStatus.BAD_REQUEST
return content, status_code
Expand Down Expand Up @@ -589,6 +585,8 @@ async def rearrange_experts(self, request_dict: dict):
status_code = HTTPStatus.BAD_REQUEST
else:
weight = np.array(request_dict["data"], dtype=np.int32)
api_server_logger.info(f"expert_tokens_stats_array_list: {weight}")

for idx in range(len(self.expert_tokens_stats_array_list)):
self.expert_tokens_stats_array_list[idx].value[:] = weight[:]
self.signal_update_weight_from_disk_array_list[idx].value[0] = 1
Expand Down Expand Up @@ -645,10 +643,10 @@ async def get_per_expert_tokens_stats(self, request_dict: dict):
status_code = HTTPStatus.UNAUTHORIZED
return content, status_code

if self.config.parallel_config.expert_parallel_rank != 0:
if self.config.parallel_config.tensor_parallel_rank != 0:
content = {
"code": 1,
"msg": f"actual rank {self.config.parallel_config.expert_parallel_rank}, expect rank 0",
"msg": f"actual rank {self.config.parallel_config.tensor_parallel_rank}, expect rank 0",
}
status_code = HTTPStatus.BAD_REQUEST
return content, status_code
Expand Down Expand Up @@ -688,10 +686,10 @@ async def check_redundant(self, request_dict: dict):
status_code = HTTPStatus.UNAUTHORIZED
return content, status_code

if self.config.parallel_config.expert_parallel_rank != 0:
if self.config.parallel_config.tensor_parallel_rank != 0:
content = {
"code": 1,
"msg": f"actual rank {self.config.parallel_config.expert_parallel_rank}, expect rank 0",
"msg": f"actual rank {self.config.parallel_config.tensor_parallel_rank}, expect rank 0",
}
status_code = HTTPStatus.BAD_REQUEST
return content, status_code
Expand Down
65 changes: 34 additions & 31 deletions fastdeploy/eplb/experts_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class RedundantExpertManager:
def __init__(
self,
rank: int = 0,
ep_size: int = 32,
ep_size: int = 64,
fd_config: FDConfig = None,
ipc_signal_suffix: int = 0,
):
Expand All @@ -54,6 +54,7 @@ def __init__(
self.num_hidden_layers = self.fd_config.model_config.num_hidden_layers
self.num_logical_experts = self.fd_config.model_config.moe_num_experts
self.ipc_signal_suffix = ipc_signal_suffix
self.local_rank = self.rank % self.fd_config.parallel_config.tensor_parallel_size

self.num_replicas = self.num_logical_experts + self.num_redundant_experts
self.num_groups = self.num_logical_experts
Expand Down Expand Up @@ -171,20 +172,21 @@ def listen_rearrange_expert_signal(self):
"""
listen_rearrange_expert_signal
"""
if self.rank == 0:
dp_ipc_signal_suffix = f"{self.ipc_signal_suffix}_dp{self.fd_config.parallel_config.local_data_parallel_id}"
if self.local_rank == 0:
rearrange_experts_ips_size_array = np.zeros([1], dtype=np.int32)
rearrange_experts_ips_size_signal = IPCSignal(
name="rearrange_experts_ips_size",
array=rearrange_experts_ips_size_array,
dtype=np.int32,
suffix=self.ipc_signal_suffix,
suffix=dp_ipc_signal_suffix,
create=False,
)

shm_rearrange_experts_ips_list = IPCSignal(
name="rearrange_experts_ips_list",
shm_size=self.eplb_config.redundant_expert_ip_shm_size,
suffix=self.ipc_signal_suffix,
suffix=dp_ipc_signal_suffix,
create=False,
)

Expand All @@ -193,16 +195,25 @@ def listen_rearrange_expert_signal(self):
name="rearrange_experts_status",
array=rearrange_experts_status,
dtype=np.int32,
suffix=self.ipc_signal_suffix,
suffix=dp_ipc_signal_suffix,
create=False,
)
signal_update_weight_from_tensor = np.zeros([1], dtype=np.int32)
self.signal_update_weight_from_tensor_array = IPCSignal(
name="signal_update_weight_from_tensor",
array=signal_update_weight_from_tensor,
dtype=np.int32,
suffix=dp_ipc_signal_suffix,
create=False,
)

tp_ipc_signal_suffix = f"{dp_ipc_signal_suffix}_tp{self.local_rank}"
signal_update_weight_from_disk = np.zeros([1], dtype=np.int32)
signal_update_weight_from_disk_array = IPCSignal(
name="signal_update_weight_from_disk",
array=signal_update_weight_from_disk,
dtype=np.int32,
suffix=self.ipc_signal_suffix,
suffix=tp_ipc_signal_suffix,
create=False,
)

Expand All @@ -214,12 +225,21 @@ def listen_rearrange_expert_signal(self):
name="all_experts_token_stats",
array=experts_token_stats,
dtype=np.int32,
suffix=self.ipc_signal_suffix,
suffix=tp_ipc_signal_suffix,
create=False,
)

result_update_weight_from_disk = np.zeros([1], dtype=np.int32)
self.update_weight_from_disk_result = IPCSignal(
name="result_update_weight_from_disk",
array=result_update_weight_from_disk,
dtype=np.int32,
suffix=tp_ipc_signal_suffix,
create=False,
)

while True:
if self.rank == 0:
if self.local_rank == 0:
now = int(time.time())
if rearrange_experts_ips_size_signal.value[0] > 0:
# step 1. all reduce experts token stats
Expand Down Expand Up @@ -267,8 +287,8 @@ def caculate_expert_rank_table(self, is_init=False):
eplb_strategy = self.eplb_config.redundant_expert_eplb_strategy
if is_init:
num_groups = 1
num_nodes = 2
num_gpus = 2 * 8
num_nodes = 8
num_gpus = 8 * 8
eplb_strategy = ""
# eplb
rank_expert_list, logical_to_physical_map, expert_count = rebalance_experts(
Expand All @@ -291,7 +311,7 @@ def caculate_expert_rank_table(self, is_init=False):
self.model_expert_id_to_ep_rank_array[..., : logical_to_physical_map.shape[-1]] = logical_to_physical_map[:]
self.model_expert_in_rank_num_list[:] = expert_count[:]

if self.rank == 0:
if self.local_rank == 0:
workload = RedundantExpertWorkload()
workload.tokens_per_expert_stats_list = self.model_tokens_per_expert_stats_list.tolist()
workload.ep_rank_to_expert_id_list = rank_expert_list.tolist()
Expand All @@ -304,16 +324,7 @@ def update_weight_from_disk(self):
update_weight_from_disk
"""
begin_time = time.time()
result_update_weight_from_disk = np.zeros([1], dtype=np.int32)
update_weight_from_disk_result = IPCSignal(
name="result_update_weight_from_disk",
array=result_update_weight_from_disk,
dtype=np.int32,
suffix=self.ipc_signal_suffix,
create=False,
)
update_weight_from_disk_result.value[0] = 0

self.update_weight_from_disk_result.value[0] = 0
self.logger.info(f"redundant_expert: update_weight_from_disk send to async process, rank {self.rank}")
self.parent_mg_conn.send(
{
Expand All @@ -326,7 +337,7 @@ def update_weight_from_disk(self):
self.tensor_infos = response["weights"]

# 更新权重加载结果
update_weight_from_disk_result.value[0] = 1 if response["result"] else -1
self.update_weight_from_disk_result.value[0] = 1 if response["result"] else -1
self.logger.info(
"redundant_expert: update_weight_from_disk end, rank"
+ f" {self.rank} {response['result']}, cost {int(time.time() - begin_time)}s"
Expand Down Expand Up @@ -441,15 +452,7 @@ def allreduce_load_weight_result(self):
or not self.eplb_config.redundant_expert_enable_schedule_cordon
):
self.logger.info("redundant_expert: allreduce_load_weight_result success, notify infer.py")
signal_update_weight_from_tensor = np.zeros([1], dtype=np.int32)
signal_update_weight_from_tensor_array = IPCSignal(
name="signal_update_weight_from_tensor",
array=signal_update_weight_from_tensor,
dtype=np.int32,
suffix=self.ipc_signal_suffix,
create=False,
)
signal_update_weight_from_tensor_array.value[0] = 1
self.signal_update_weight_from_tensor_array.value[0] = 1
return True

def allgather_load_weight_result(self):
Expand Down
Loading
Loading