From ed0dd61ecb3f9ba41d63ff8b709a3e1a83eff45d Mon Sep 17 00:00:00 2001 From: liuhaoyang Date: Tue, 28 Dec 2021 09:47:38 +0800 Subject: [PATCH] Add rateps aggregator (#106) * Use the new data specification to handle span_layer * Add rateps aggregator * Fix second interval * Fix build error * Remove gc log * Fix expression null exception * Fix RATEPS --- .../aggregators/DefaultAccumulator.java | 2 +- .../aggregators/FunctionAggregatorDefine.java | 2 + .../FunctionAggregatorFactory.java | 6 +- .../aggregators/RatepsFunctionAggregator.java | 59 +++++++++++++++++++ .../MetricAggregateProcessFunction.java | 31 +++++----- 5 files changed, 83 insertions(+), 17 deletions(-) create mode 100644 analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/RatepsFunctionAggregator.java diff --git a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/DefaultAccumulator.java b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/DefaultAccumulator.java index bfee0d8..f3e914e 100644 --- a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/DefaultAccumulator.java +++ b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/DefaultAccumulator.java @@ -57,7 +57,7 @@ public boolean initialized() { @Override public FunctionAggregator getMetricAggregator(ExpressionFunction function) { - return aggregators.computeIfAbsent(function.getField() + "_" + function.getAggregator(), k -> create(function)); + return aggregators.computeIfAbsent(function.getField() + "_" + function.getAggregator(), k -> create(expression, function)); } @Override diff --git a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/FunctionAggregatorDefine.java b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/FunctionAggregatorDefine.java index 6107ec1..ccef206 100644 --- a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/FunctionAggregatorDefine.java +++ b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/FunctionAggregatorDefine.java @@ -51,4 +51,6 @@ public class FunctionAggregatorDefine { static final String MEDIAN = "median"; static final String MEAN = "mean"; + + static final String RATEPS = "rateps"; } diff --git a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/FunctionAggregatorFactory.java b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/FunctionAggregatorFactory.java index 901fc9d..15bbd50 100644 --- a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/FunctionAggregatorFactory.java +++ b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/FunctionAggregatorFactory.java @@ -14,6 +14,7 @@ package cloud.erda.analyzer.runtime.expression.functions.aggregators; +import cloud.erda.analyzer.runtime.models.Expression; import cloud.erda.analyzer.runtime.models.ExpressionFunction; import cloud.erda.analyzer.runtime.models.ExpressionFunctionTrigger; import lombok.extern.slf4j.Slf4j; @@ -27,7 +28,7 @@ @Slf4j public class FunctionAggregatorFactory { - public static FunctionAggregator create(ExpressionFunction function) { + public static FunctionAggregator create(Expression expression, ExpressionFunction function) { FunctionAggregator aggregator = null; @@ -70,6 +71,9 @@ public static FunctionAggregator create(ExpressionFunction function) { case MEDIAN: aggregator = new MedianFunctionAggregator(); break; + case RATEPS: + aggregator = new RatepsFunctionAggregator(expression.getWindow()); + break; default: /** * 如果用户定义了未实现的聚合器,默认使用空聚合器,业务容错避免抛出异常影响其他的表达式执行 diff --git a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/RatepsFunctionAggregator.java b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/RatepsFunctionAggregator.java new file mode 100644 index 0000000..0f7b6be --- /dev/null +++ b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/expression/functions/aggregators/RatepsFunctionAggregator.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 Terminus, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.erda.analyzer.runtime.expression.functions.aggregators; + +import cloud.erda.analyzer.common.utils.ConvertUtils; + +/** + * @author liuhaoyang + * @date 2021/12/27 10:59 + */ +public class RatepsFunctionAggregator implements FunctionAggregator { + + private final long interval; + private double sum; + + public RatepsFunctionAggregator(long window) { + sum = 0d; + interval = window * 60; + } + + @Override + public String aggregator() { + return FunctionAggregatorDefine.RATEPS; + } + + @Override + public Object value() { + return sum / interval; + } + + @Override + public void apply(Object value) { + Double d = ConvertUtils.toDouble(value); + if (d != null) { + sum += d; + } + } + + @Override + public void merge(FunctionAggregator other) { + if (other instanceof RatepsFunctionAggregator) { + this.sum += ((RatepsFunctionAggregator) other).sum; + } + } +} diff --git a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/functions/MetricAggregateProcessFunction.java b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/functions/MetricAggregateProcessFunction.java index e8ab318..cb8d2d5 100644 --- a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/functions/MetricAggregateProcessFunction.java +++ b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/functions/MetricAggregateProcessFunction.java @@ -48,21 +48,6 @@ public Accumulator createAccumulator() { @Override public Accumulator add(KeyedMetricEvent value, Accumulator accumulator) { - /** - * 执行表达式中 function 的 aggregator - */ - for (ExpressionFunction function : value.getExpression().getFunctions()) { - if (!ExpressionFunctionTrigger.applied.equals(function.getTrigger())) { - continue; - } - FunctionAggregator aggregator = accumulator.getMetricAggregator(function); - if (!(aggregator instanceof AppliedFunctionAggregator)) { - aggregator = new AppliedFunctionAggregator(aggregator); - } - Object v = getFieldValue(value.getMetric(), value.getMetadataId(), function); - aggregator.apply(v); - accumulator.setFunction(aggregator, function); - } if (!accumulator.initialized()) { accumulator.setMetadataId(value.getMetadataId()); @@ -82,6 +67,22 @@ public Accumulator add(KeyedMetricEvent value, Accumulator accumulator) { accumulator.setMetric(value.getMetric()); accumulator.setKey(value.getKey()); + /** + * 执行表达式中 function 的 aggregator + */ + for (ExpressionFunction function : value.getExpression().getFunctions()) { + if (!ExpressionFunctionTrigger.applied.equals(function.getTrigger())) { + continue; + } + FunctionAggregator aggregator = accumulator.getMetricAggregator(function); + if (!(aggregator instanceof AppliedFunctionAggregator)) { + aggregator = new AppliedFunctionAggregator(aggregator); + } + Object v = getFieldValue(value.getMetric(), value.getMetadataId(), function); + aggregator.apply(v); + accumulator.setFunction(aggregator, function); + } + return accumulator; }