|
| 1 | +# Copyright (c) Microsoft Corporation. |
| 2 | +# Licensed under the MIT license. |
| 3 | + |
| 4 | +"""A module for baseline generation.""" |
| 5 | + |
| 6 | +from copy import deepcopy |
| 7 | +from pathlib import Path |
| 8 | +import json |
| 9 | +import re |
| 10 | + |
| 11 | +from joblib import Parallel, delayed |
| 12 | +import pandas as pd |
| 13 | + |
| 14 | +from superbench.common.utils import logger |
| 15 | +from superbench.analyzer import file_handler |
| 16 | +from superbench.analyzer import data_analysis |
| 17 | +from superbench.analyzer import DataDiagnosis |
| 18 | +from superbench.analyzer import ResultSummary |
| 19 | +from superbench.analyzer.diagnosis_rule_op import RuleOp, DiagnosisRuleType |
| 20 | + |
| 21 | + |
| 22 | +class BaselineGeneration(DataDiagnosis): |
| 23 | + """The class to generate baseline for raw data.""" |
| 24 | + def fix_threshold_outlier_detection(self, data_series, single_metric_with_baseline, metric, rule_op): |
| 25 | + """Fix threshold outlier detection algorithm. |
| 26 | +
|
| 27 | + Step 0: Put all data in the collection. |
| 28 | + Step 1: Regenerate the collection. |
| 29 | + Calculate the average number in the collection as the baseline. |
| 30 | + Remove all data which cannot pass the fix threshold based on the new baseline. |
| 31 | + Step 2: If no data has been removed from Step 1, go to Step 3; otherwise, go to Step 1. |
| 32 | + Step 3: Use the baseline and fix threshold for Outlier Detection. |
| 33 | +
|
| 34 | + Args: |
| 35 | + data_series (pd.Series): data of the metric. |
| 36 | + single_metric_with_baseline (dict): baseline of the single metric in 'metrics' in 2-layer dict format. |
| 37 | + metric (str): the name of the metric to execute the algorithm. |
| 38 | + rule_op (function): diagnosis rule op function. |
| 39 | +
|
| 40 | + Returns: |
| 41 | + tuple: the baseline of the metric, normal data of the metric. |
| 42 | + """ |
| 43 | + if single_metric_with_baseline['metrics'][metric] \ |
| 44 | + is not None and single_metric_with_baseline['metrics'][metric] != -1: |
| 45 | + return single_metric_with_baseline['metrics'][metric] |
| 46 | + tmp_single_metric_with_baseline = deepcopy(single_metric_with_baseline) |
| 47 | + tmp_single_metric_with_baseline['metrics'] = {} |
| 48 | + clean = False |
| 49 | + while clean is False: |
| 50 | + clean = True |
| 51 | + baseline_val = data_series.mean() |
| 52 | + for val in data_series.index: |
| 53 | + tmp_single_metric_with_baseline['metrics'][metric] = baseline_val |
| 54 | + if baseline_val == 0: |
| 55 | + break |
| 56 | + data_row = pd.Series([data_series[val]], index=[metric]) |
| 57 | + details = [] |
| 58 | + categories = set() |
| 59 | + summary_data_row = pd.Series(index=[metric], dtype=float) |
| 60 | + violated_num = rule_op(data_row, tmp_single_metric_with_baseline, summary_data_row, details, categories) |
| 61 | + if violated_num: |
| 62 | + data_series = data_series.drop(val) |
| 63 | + clean = False |
| 64 | + baseline = tmp_single_metric_with_baseline['metrics'][metric] |
| 65 | + return baseline, data_series |
| 66 | + |
| 67 | + def get_aggregate_data(self, raw_data_file, summary_rule_file): |
| 68 | + r"""Aggregate raw data according to the summary rule file. |
| 69 | +
|
| 70 | + If the metric is aggregated by rank (:\d+), remove the rank info to generate the metric name and aggregate data. |
| 71 | + If the metric is aggregated by regex pattern, aggregate the data and copy to all metrics matches this pattern. |
| 72 | +
|
| 73 | + Args: |
| 74 | + raw_data_file (str): the file name of the raw data file. |
| 75 | + summary_rule_file (str): the file name of the summary rule file. |
| 76 | +
|
| 77 | + Returns: |
| 78 | + DataFrame: aggregated data |
| 79 | + """ |
| 80 | + self.rs = ResultSummary() |
| 81 | + rules = self.rs._preprocess(raw_data_file, summary_rule_file) |
| 82 | + # parse rules for result summary |
| 83 | + if not self.rs._parse_rules(rules): |
| 84 | + return |
| 85 | + aggregated_df = pd.DataFrame() |
| 86 | + for rule in self.rs._sb_rules: |
| 87 | + single_metric_rule = self.rs._sb_rules[rule] |
| 88 | + metrics = list(single_metric_rule['metrics'].keys()) |
| 89 | + data_df_of_rule = self.rs._raw_data_df[metrics] |
| 90 | + if self.rs._sb_rules[rule]['aggregate']: |
| 91 | + # if aggregate is True, aggregate in ranks |
| 92 | + if self.rs._sb_rules[rule]['aggregate'] is True: |
| 93 | + data_df_of_rule = data_analysis.aggregate(data_df_of_rule) |
| 94 | + # if aggregate is not empty and is a pattern in regex, aggregate according to pattern |
| 95 | + else: |
| 96 | + pattern = self.rs._sb_rules[rule]['aggregate'] |
| 97 | + data_df_of_rule_with_short_name = data_analysis.aggregate(data_df_of_rule, pattern) |
| 98 | + data_df_of_rule = pd.DataFrame(columns=metrics) |
| 99 | + # restore the columns of data_fd to full metric names |
| 100 | + for metric in metrics: |
| 101 | + short = '' |
| 102 | + match = re.search(pattern, metric) |
| 103 | + if match: |
| 104 | + metric_in_list = list(metric) |
| 105 | + for i in range(1, len(match.groups()) + 1): |
| 106 | + metric_in_list[match.start(i):match.end(i)] = '*' |
| 107 | + short = ''.join(metric_in_list) |
| 108 | + data_df_of_rule[metric] = data_df_of_rule_with_short_name[short] |
| 109 | + aggregated_df = pd.concat([aggregated_df, data_df_of_rule], axis=1) |
| 110 | + return aggregated_df |
| 111 | + |
| 112 | + def generate_baseline(self, algo, aggregated_df, diagnosis_rule_file, baseline): |
| 113 | + """Generate the baseline in json format. |
| 114 | +
|
| 115 | + Args: |
| 116 | + algo (str): the algorithm to generate the baseline. |
| 117 | + aggregated_df (DataFrame): aggregated data. |
| 118 | + diagnosis_rule_file (str): the file name of the diagnosis rules which used in fix_threshold algorithm. |
| 119 | + baseline (dict): existing baseline of some metrics. |
| 120 | +
|
| 121 | + Returns: |
| 122 | + dict: baseline of metrics defined in diagnosis_rule_files for fix_threshold algorithm or |
| 123 | + defined in rule_summary_files for mean. |
| 124 | + """ |
| 125 | + # re-organize metrics by benchmark names |
| 126 | + self._benchmark_metrics_dict = self._get_metrics_by_benchmarks(list(self._raw_data_df.columns)) |
| 127 | + if algo == 'mean': |
| 128 | + mean_df = self._raw_data_df.mean() |
| 129 | + for metric in self._raw_data_df.columns: |
| 130 | + if metric in baseline: |
| 131 | + return baseline[metric] |
| 132 | + baseline[metric] = mean_df[metric] |
| 133 | + elif algo == 'fix_threshold': |
| 134 | + # read diagnosis rules |
| 135 | + rules = file_handler.read_rules(diagnosis_rule_file) |
| 136 | + if not self._parse_rules_and_baseline(rules, baseline): |
| 137 | + return baseline |
| 138 | + else: |
| 139 | + for rule in self._sb_rules: |
| 140 | + single_metric_rule = self._sb_rules[rule] |
| 141 | + metrics = list(single_metric_rule['metrics'].keys()) |
| 142 | + function_name = self._sb_rules[rule]['function'] |
| 143 | + rule_op = RuleOp.get_rule_func(DiagnosisRuleType(function_name)) |
| 144 | + outputs = Parallel(n_jobs=-1)( |
| 145 | + delayed(self.fix_threshold_outlier_detection) |
| 146 | + (aggregated_df[metric], single_metric_rule, metric, rule_op) for metric in metrics |
| 147 | + ) |
| 148 | + for index, out in enumerate(outputs): |
| 149 | + baseline[metrics[index]] = out[0] |
| 150 | + aggregated_df[metrics[index]] = out[1] |
| 151 | + return baseline |
| 152 | + |
| 153 | + def run( |
| 154 | + self, raw_data_file, summary_rule_file, diagnosis_rule_file, pre_baseline_file, algorithm, output_dir, digit=2 |
| 155 | + ): |
| 156 | + """Export baseline to json file. |
| 157 | +
|
| 158 | + Args: |
| 159 | + raw_data_file (str): Path to raw data jsonl file. |
| 160 | + summary_rule_file (str): the file name of the summary rule file. |
| 161 | + diagnosis_rule_file (str): the file name of the diagnosis rules which used in fix_threshold algorithm. |
| 162 | + pre_baseline_file (str): the file name of the previous baseline file. |
| 163 | + algorithm (str): the algorithm to generate the baseline. |
| 164 | + output_dir (str): the directory to save the baseline file. |
| 165 | + digit (int): the number of digits after the decimal point. |
| 166 | + """ |
| 167 | + try: |
| 168 | + # aggregate results from different devices |
| 169 | + self._raw_data_df = self.get_aggregate_data(raw_data_file, summary_rule_file) |
| 170 | + # read existing baseline |
| 171 | + baseline = {} |
| 172 | + if pre_baseline_file: |
| 173 | + baseline = file_handler.read_baseline(pre_baseline_file) |
| 174 | + # generate baseline accordint to rules in diagnosis and fix threshold outlier detection method |
| 175 | + baseline = self.generate_baseline(algorithm, self._raw_data_df, diagnosis_rule_file, baseline) |
| 176 | + for metric in baseline: |
| 177 | + val = baseline[metric] |
| 178 | + if metric in self._raw_data_df: |
| 179 | + if isinstance(self._raw_data_df[metric].iloc[0], float): |
| 180 | + baseline[metric] = f'%.{digit}g' % val if abs(val) < 1 else f'%.{digit}f' % val |
| 181 | + elif isinstance(self._raw_data_df[metric].iloc[0], int): |
| 182 | + baseline[metric] = int(val) |
| 183 | + else: |
| 184 | + try: |
| 185 | + baseline[metric] = float(val) |
| 186 | + except Exception as e: |
| 187 | + logger.error('Analyzer: {} baseline is not numeric, msg: {}'.format(metric, str(e))) |
| 188 | + baseline = json.dumps(baseline, indent=2, sort_keys=True) |
| 189 | + baseline = re.sub(r': \"(\d+.?\d*)\"', r': \1', baseline) |
| 190 | + with (Path(output_dir) / 'baseline.json').open('w') as f: |
| 191 | + f.write(baseline) |
| 192 | + |
| 193 | + except Exception as e: |
| 194 | + logger.error('Analyzer: generate baseline failed, msg: {}'.format(str(e))) |
0 commit comments