Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions byte_micro_perf/README_MT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
## Quickstart

### Prepare running environment

```
git clone https://github.com/bytedance/ByteMLPerf.git
cd ByteMLPerf/byte_micro_perf
```

### Prepare hardware configuration(optional)
Please follow the given style at `ByteMLPerf/vendor_zoo` directory to create a new hardware config file for your own heterogeneous hardware. Because this helps the framework evaluate operator performance on new hardware more precisely.

### An example

```
python3 launch.py --task exp --hardware_type MTGPU
```
#### Usage
```
--task: operator name please create a workload file for new operators by following the existing style in byte_micro_perf/workloads.

--hardware_type: hardware category name please derive a Backend class for your heterogeneous hardware in byte_micro_perf/backends.
```
187 changes: 187 additions & 0 deletions byte_micro_perf/backends/MTGPU/backend_mtgpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright 2023 ByteDance and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import math
import os
from datetime import timedelta
from typing import Any, Dict, List

import torch
import torch_musa
import torch.distributed as dist
import torch.distributed.distributed_c10d as dist_c10d
from backends.backend import Backend
from backends.module_store import *

logging.basicConfig(level=logging.INFO)
log = logging.getLogger("PerfEngine")


class BackendMTGPU(Backend):
def get_device_name(self) -> str:
return torch.musa.get_device_name(0)

def get_backend_properties(self) -> None:
self.memory_limit = int(
torch.musa.get_device_properties(0).total_memory / (1024**3)
)

if os.path.exists(self.vendor_path) and (self.vendor_path).endswith(".json"):
with open(self.vendor_path, "r") as f:
self.hw_info_dict = json.load(f)
# if the vendor path does not exist, please set this param manaually
self.bandwidth_limit = self.hw_info_dict["内存参数"]["内存"][
"内存带宽(GB/s)"
]
else:
log.warning(
"Vendor_path: [ {} ] was not found or not a full path points to json, please check your path!!! Otherwise, please set the hardware info manaually.".format(
self.vendor_path
)
)

def gemm(self) -> None:
self.op = GemmOp()

def add(self) -> None:
self.op = AddOp()

def sin(self) -> None:
self.op = SinOp()

def cos(self) -> None:
self.op = CosOp()

def exp(self) -> None:
self.op = ExpOp()

def exponential(self) -> None:
self.op = ExponentialOp()

def gelu(self) -> None:
self.op = GeluOp()

def sort(self) -> None:
self.op = SortOp()

def unique(self) -> None:
self.op = UniqueOp()

def indexadd(self) -> None:
self.op = IndexAddOp()

def softmax(self) -> None:
self.op = SoftmaxOp()

def layernorm(self) -> None:
self.op = LayerNormOp()

def allreduce(self) -> None:
self.setup_2d_group()
self.op = AllReduceOp(self.group)

def allgather(self) -> None:
self.setup_2d_group()
self.op = AllGatherOp(self.group)

def reducescatter(self) -> None:
self.setup_2d_group()
self.op = ReduceScatterOp(self.group)

def alltoall(self) -> None:
self.setup_2d_group()
self.op = AllToAllOp(self.group)

def broadcast(self) -> None:
self.setup_2d_group()
self.op = BroadcastOp(self.group)

def host2device(self) -> None:
self.op = Host2DeviceOp(torch.device("musa"))

def device2host(self) -> None:
self.op = Device2HostOp()

def build_tensor(self, input_shapes, dtype):
torch_type = getattr(torch, dtype)
if torch_type == torch.int32:
dtype_size = torch.iinfo(torch_type).bits // 8
else:
dtype_size = torch.finfo(torch_type).bits // 8
size = sum([math.prod(shape) for shape in input_shapes])
data_amount = size * 2 * dtype_size
data_cnt = (self.memory_limit - 4) * 1024**3 // data_amount
data_cnt = min(data_cnt, self.iterations)
input_tensors_list = []
for _ in range(data_cnt):
input_tensors = [
torch.randn(shape).type(torch_type).to(torch.device("musa"))
for shape in input_shapes
]
input_tensors_list.append(input_tensors)

if hasattr(self.op, "process_inputs"):
input_tensors_list = [
self.op.process_inputs(*(input_tensor))
for input_tensor in input_tensors_list
]

return input_tensors_list, data_cnt

def _run_operation(self, operation, inputs):
result = operation(*inputs)
return result

def device_synchronize(self) -> bool:
torch.musa.synchronize()
return True

def initialize_ccl(self, rank, world_size):
"""
initialize distributed process groups and relevant ENVs
"""
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "49373"
os.environ["LOCAL_RANK"] = str(rank)
os.environ["RANK"] = str(rank)
os.environ["WORLD_SIZE"] = str(world_size)

torch.musa.set_device(rank)
# Call the init process
timeout_seconds = int(os.environ.get("MEGATRON_MCCL_TIMEOUT_SECOND", 30))
torch.distributed.init_process_group(
backend="mccl",
world_size=world_size,
rank=rank,
store=None,
timeout=timedelta(seconds=timeout_seconds),
)
self.setup_2d_group()
log.warning("DIST: rank {}, world_size {}".format(rank, world_size))

def setup_2d_group(self):
self.rank = dist.get_rank()
torch.musa.set_device(self.rank)
origin_store_based_barrier = dist_c10d._store_based_barrier
dist_c10d._store_based_barrier = lambda *a, **kw: None
self.world_size = dist.get_world_size()
ranks = range(0, self.world_size)
group = dist.new_group(ranks)
if self.rank in ranks:
self.group = group
dist_c10d._store_based_barrier = origin_store_based_barrier
# wait for all ranks finish group initializing
torch.distributed.barrier()
1 change: 1 addition & 0 deletions byte_micro_perf/backends/MTGPU/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
torch==2.0.0
102 changes: 55 additions & 47 deletions byte_micro_perf/core/perf_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,13 @@ def start_engine(self) -> None:
output_dir = os.path.abspath("reports/" + self.backend_type)
os.makedirs(output_dir, exist_ok=True)

if self.args.task in ["allreduce", "allgather", "reducescatter", "alltoall", "broadcast"]:
if self.args.task in [
"allreduce",
"allgather",
"reducescatter",
"alltoall",
"broadcast",
]:
for group in self.workload["group"]:
mp.spawn(fn=self.init_process, args=(group,), nprocs=group)
else:
Expand Down Expand Up @@ -187,7 +193,9 @@ def start_perf(self, workload: Dict[str, Any]) -> bool:
reports = self.backend.perf(input_shape, dtype)
except Exception as e:
traceback.print_exc()
log.error(f"Execute op: {op_name.lower()} failed, input_shape: {input_shape}, dtype: {dtype}, error msg: {e}")
log.error(
f"Execute op: {op_name.lower()} failed, input_shape: {input_shape}, dtype: {dtype}, error msg: {e}"
)
reports = {}
perf_reports.append(reports)
base_report["Performance"] = perf_reports
Expand Down Expand Up @@ -227,51 +235,51 @@ def activate_venv(self, hardware_type: str) -> bool:

venv_dir = os.path.join("backends", hardware_type + "/venv")
activate_file = os.path.join(venv_dir, "bin", "activate_this.py")
if not os.path.exists(venv_dir):
log.info("venv not exist, Creating Virtual Env for " + hardware_type)

virtualenv.create_environment(venv_dir, True)

exec(open(activate_file).read(), {"__file__": activate_file})
python_path = os.path.join(venv_dir, "bin", "python3")
subprocess.call(
[python_path, "-m", "pip", "install", "--upgrade", "pip", "--quiet"]
)
subprocess.call(
[
python_path,
"-m",
"pip",
"install",
"-r",
"backends/" + hardware_type + "/requirements.txt",
"-q",
]
)
else:
exec(open(activate_file).read(), {"__file__": activate_file})
"""
just in case install failed in pre-run.
"""
python_path = os.path.join(venv_dir, "bin", "python3")
subprocess.call(
[python_path, "-m", "pip", "install", "--upgrade", "pip", "--quiet"]
)
subprocess.call(
[
python_path,
"-m",
"pip",
"install",
"-r",
"backends/" + hardware_type + "/requirements.txt",
"-q",
]
)

if not hasattr(sys, "real_prefix"):
return False
return True
# if not os.path.exists(venv_dir):
# log.info("venv not exist, Creating Virtual Env for " + hardware_type)
#
# virtualenv.create_environment(venv_dir, True)
#
# exec(open(activate_file).read(), {"__file__": activate_file})
# python_path = os.path.join(venv_dir, "bin", "python3")
# subprocess.call(
# [python_path, "-m", "pip", "install", "--upgrade", "pip", "--quiet"]
# )
# subprocess.call(
# [
# python_path,
# "-m",
# "pip",
# "install",
# "-r",
# "backends/" + hardware_type + "/requirements.txt",
# "-q",
# ]
# )
# else:
# exec(open(activate_file).read(), {"__file__": activate_file})
# """
# just in case install failed in pre-run.
# """
# python_path = os.path.join(venv_dir, "bin", "python3")
# subprocess.call(
# [python_path, "-m", "pip", "install", "--upgrade", "pip", "--quiet"]
# )
# subprocess.call(
# [
# python_path,
# "-m",
# "pip",
# "install",
# "-r",
# "backends/" + hardware_type + "/requirements.txt",
# "-q",
# ]
# )
#
# if not hasattr(sys, "real_prefix"):
# return False
# return True
return True

def deactivate_venv(self):
Expand Down