diff --git a/byte_micro_perf/README_MT.md b/byte_micro_perf/README_MT.md new file mode 100644 index 000000000..e48fdb9b7 --- /dev/null +++ b/byte_micro_perf/README_MT.md @@ -0,0 +1,23 @@ +## Quickstart + +### Prepare running environment + +``` +git clone https://github.com/bytedance/ByteMLPerf.git +cd ByteMLPerf/byte_micro_perf +``` + +### Prepare hardware configuration(optional) +Please follow the given style at `ByteMLPerf/vendor_zoo` directory to create a new hardware config file for your own heterogeneous hardware. Because this helps the framework evaluate operator performance on new hardware more precisely. + +### An example + +``` +python3 launch.py --task exp --hardware_type MTGPU +``` +#### Usage +``` +--task: operator name please create a workload file for new operators by following the existing style in byte_micro_perf/workloads. + +--hardware_type: hardware category name please derive a Backend class for your heterogeneous hardware in byte_micro_perf/backends. +``` \ No newline at end of file diff --git a/byte_micro_perf/backends/MTGPU/backend_mtgpu.py b/byte_micro_perf/backends/MTGPU/backend_mtgpu.py new file mode 100644 index 000000000..e0adeab70 --- /dev/null +++ b/byte_micro_perf/backends/MTGPU/backend_mtgpu.py @@ -0,0 +1,187 @@ +# Copyright 2023 ByteDance and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import math +import os +from datetime import timedelta +from typing import Any, Dict, List + +import torch +import torch_musa +import torch.distributed as dist +import torch.distributed.distributed_c10d as dist_c10d +from backends.backend import Backend +from backends.module_store import * + +logging.basicConfig(level=logging.INFO) +log = logging.getLogger("PerfEngine") + + +class BackendMTGPU(Backend): + def get_device_name(self) -> str: + return torch.musa.get_device_name(0) + + def get_backend_properties(self) -> None: + self.memory_limit = int( + torch.musa.get_device_properties(0).total_memory / (1024**3) + ) + + if os.path.exists(self.vendor_path) and (self.vendor_path).endswith(".json"): + with open(self.vendor_path, "r") as f: + self.hw_info_dict = json.load(f) + # if the vendor path does not exist, please set this param manaually + self.bandwidth_limit = self.hw_info_dict["内存参数"]["内存"][ + "内存带宽(GB/s)" + ] + else: + log.warning( + "Vendor_path: [ {} ] was not found or not a full path points to json, please check your path!!! Otherwise, please set the hardware info manaually.".format( + self.vendor_path + ) + ) + + def gemm(self) -> None: + self.op = GemmOp() + + def add(self) -> None: + self.op = AddOp() + + def sin(self) -> None: + self.op = SinOp() + + def cos(self) -> None: + self.op = CosOp() + + def exp(self) -> None: + self.op = ExpOp() + + def exponential(self) -> None: + self.op = ExponentialOp() + + def gelu(self) -> None: + self.op = GeluOp() + + def sort(self) -> None: + self.op = SortOp() + + def unique(self) -> None: + self.op = UniqueOp() + + def indexadd(self) -> None: + self.op = IndexAddOp() + + def softmax(self) -> None: + self.op = SoftmaxOp() + + def layernorm(self) -> None: + self.op = LayerNormOp() + + def allreduce(self) -> None: + self.setup_2d_group() + self.op = AllReduceOp(self.group) + + def allgather(self) -> None: + self.setup_2d_group() + self.op = AllGatherOp(self.group) + + def reducescatter(self) -> None: + self.setup_2d_group() + self.op = ReduceScatterOp(self.group) + + def alltoall(self) -> None: + self.setup_2d_group() + self.op = AllToAllOp(self.group) + + def broadcast(self) -> None: + self.setup_2d_group() + self.op = BroadcastOp(self.group) + + def host2device(self) -> None: + self.op = Host2DeviceOp(torch.device("musa")) + + def device2host(self) -> None: + self.op = Device2HostOp() + + def build_tensor(self, input_shapes, dtype): + torch_type = getattr(torch, dtype) + if torch_type == torch.int32: + dtype_size = torch.iinfo(torch_type).bits // 8 + else: + dtype_size = torch.finfo(torch_type).bits // 8 + size = sum([math.prod(shape) for shape in input_shapes]) + data_amount = size * 2 * dtype_size + data_cnt = (self.memory_limit - 4) * 1024**3 // data_amount + data_cnt = min(data_cnt, self.iterations) + input_tensors_list = [] + for _ in range(data_cnt): + input_tensors = [ + torch.randn(shape).type(torch_type).to(torch.device("musa")) + for shape in input_shapes + ] + input_tensors_list.append(input_tensors) + + if hasattr(self.op, "process_inputs"): + input_tensors_list = [ + self.op.process_inputs(*(input_tensor)) + for input_tensor in input_tensors_list + ] + + return input_tensors_list, data_cnt + + def _run_operation(self, operation, inputs): + result = operation(*inputs) + return result + + def device_synchronize(self) -> bool: + torch.musa.synchronize() + return True + + def initialize_ccl(self, rank, world_size): + """ + initialize distributed process groups and relevant ENVs + """ + os.environ["MASTER_ADDR"] = "127.0.0.1" + os.environ["MASTER_PORT"] = "49373" + os.environ["LOCAL_RANK"] = str(rank) + os.environ["RANK"] = str(rank) + os.environ["WORLD_SIZE"] = str(world_size) + + torch.musa.set_device(rank) + # Call the init process + timeout_seconds = int(os.environ.get("MEGATRON_MCCL_TIMEOUT_SECOND", 30)) + torch.distributed.init_process_group( + backend="mccl", + world_size=world_size, + rank=rank, + store=None, + timeout=timedelta(seconds=timeout_seconds), + ) + self.setup_2d_group() + log.warning("DIST: rank {}, world_size {}".format(rank, world_size)) + + def setup_2d_group(self): + self.rank = dist.get_rank() + torch.musa.set_device(self.rank) + origin_store_based_barrier = dist_c10d._store_based_barrier + dist_c10d._store_based_barrier = lambda *a, **kw: None + self.world_size = dist.get_world_size() + ranks = range(0, self.world_size) + group = dist.new_group(ranks) + if self.rank in ranks: + self.group = group + dist_c10d._store_based_barrier = origin_store_based_barrier + # wait for all ranks finish group initializing + torch.distributed.barrier() diff --git a/byte_micro_perf/backends/MTGPU/requirements.txt b/byte_micro_perf/backends/MTGPU/requirements.txt new file mode 100644 index 000000000..29b413711 --- /dev/null +++ b/byte_micro_perf/backends/MTGPU/requirements.txt @@ -0,0 +1 @@ +torch==2.0.0 \ No newline at end of file diff --git a/byte_micro_perf/core/perf_engine.py b/byte_micro_perf/core/perf_engine.py index d0ac2ec6c..f73e28dd1 100644 --- a/byte_micro_perf/core/perf_engine.py +++ b/byte_micro_perf/core/perf_engine.py @@ -134,7 +134,13 @@ def start_engine(self) -> None: output_dir = os.path.abspath("reports/" + self.backend_type) os.makedirs(output_dir, exist_ok=True) - if self.args.task in ["allreduce", "allgather", "reducescatter", "alltoall", "broadcast"]: + if self.args.task in [ + "allreduce", + "allgather", + "reducescatter", + "alltoall", + "broadcast", + ]: for group in self.workload["group"]: mp.spawn(fn=self.init_process, args=(group,), nprocs=group) else: @@ -187,7 +193,9 @@ def start_perf(self, workload: Dict[str, Any]) -> bool: reports = self.backend.perf(input_shape, dtype) except Exception as e: traceback.print_exc() - log.error(f"Execute op: {op_name.lower()} failed, input_shape: {input_shape}, dtype: {dtype}, error msg: {e}") + log.error( + f"Execute op: {op_name.lower()} failed, input_shape: {input_shape}, dtype: {dtype}, error msg: {e}" + ) reports = {} perf_reports.append(reports) base_report["Performance"] = perf_reports @@ -227,51 +235,51 @@ def activate_venv(self, hardware_type: str) -> bool: venv_dir = os.path.join("backends", hardware_type + "/venv") activate_file = os.path.join(venv_dir, "bin", "activate_this.py") - if not os.path.exists(venv_dir): - log.info("venv not exist, Creating Virtual Env for " + hardware_type) - - virtualenv.create_environment(venv_dir, True) - - exec(open(activate_file).read(), {"__file__": activate_file}) - python_path = os.path.join(venv_dir, "bin", "python3") - subprocess.call( - [python_path, "-m", "pip", "install", "--upgrade", "pip", "--quiet"] - ) - subprocess.call( - [ - python_path, - "-m", - "pip", - "install", - "-r", - "backends/" + hardware_type + "/requirements.txt", - "-q", - ] - ) - else: - exec(open(activate_file).read(), {"__file__": activate_file}) - """ - just in case install failed in pre-run. - """ - python_path = os.path.join(venv_dir, "bin", "python3") - subprocess.call( - [python_path, "-m", "pip", "install", "--upgrade", "pip", "--quiet"] - ) - subprocess.call( - [ - python_path, - "-m", - "pip", - "install", - "-r", - "backends/" + hardware_type + "/requirements.txt", - "-q", - ] - ) - - if not hasattr(sys, "real_prefix"): - return False - return True + # if not os.path.exists(venv_dir): + # log.info("venv not exist, Creating Virtual Env for " + hardware_type) + # + # virtualenv.create_environment(venv_dir, True) + # + # exec(open(activate_file).read(), {"__file__": activate_file}) + # python_path = os.path.join(venv_dir, "bin", "python3") + # subprocess.call( + # [python_path, "-m", "pip", "install", "--upgrade", "pip", "--quiet"] + # ) + # subprocess.call( + # [ + # python_path, + # "-m", + # "pip", + # "install", + # "-r", + # "backends/" + hardware_type + "/requirements.txt", + # "-q", + # ] + # ) + # else: + # exec(open(activate_file).read(), {"__file__": activate_file}) + # """ + # just in case install failed in pre-run. + # """ + # python_path = os.path.join(venv_dir, "bin", "python3") + # subprocess.call( + # [python_path, "-m", "pip", "install", "--upgrade", "pip", "--quiet"] + # ) + # subprocess.call( + # [ + # python_path, + # "-m", + # "pip", + # "install", + # "-r", + # "backends/" + hardware_type + "/requirements.txt", + # "-q", + # ] + # ) + # + # if not hasattr(sys, "real_prefix"): + # return False + # return True return True def deactivate_venv(self):