Skip to content

Commit

Permalink
Add rateps aggregator (#106)
Browse files Browse the repository at this point in the history
* Use the new data specification to handle span_layer

* Add rateps aggregator

* Fix second interval

* Fix build error

* Remove gc log

* Fix expression null exception

* Fix RATEPS
  • Loading branch information
liuhaoyang authored Dec 28, 2021
1 parent da61e63 commit ed0dd61
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public boolean initialized() {

@Override
public FunctionAggregator getMetricAggregator(ExpressionFunction function) {
return aggregators.computeIfAbsent(function.getField() + "_" + function.getAggregator(), k -> create(function));
return aggregators.computeIfAbsent(function.getField() + "_" + function.getAggregator(), k -> create(expression, function));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ public class FunctionAggregatorDefine {
static final String MEDIAN = "median";

static final String MEAN = "mean";

static final String RATEPS = "rateps";
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package cloud.erda.analyzer.runtime.expression.functions.aggregators;

import cloud.erda.analyzer.runtime.models.Expression;
import cloud.erda.analyzer.runtime.models.ExpressionFunction;
import cloud.erda.analyzer.runtime.models.ExpressionFunctionTrigger;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -27,7 +28,7 @@
@Slf4j
public class FunctionAggregatorFactory {

public static FunctionAggregator create(ExpressionFunction function) {
public static FunctionAggregator create(Expression expression, ExpressionFunction function) {

FunctionAggregator aggregator = null;

Expand Down Expand Up @@ -70,6 +71,9 @@ public static FunctionAggregator create(ExpressionFunction function) {
case MEDIAN:
aggregator = new MedianFunctionAggregator();
break;
case RATEPS:
aggregator = new RatepsFunctionAggregator(expression.getWindow());
break;
default:
/**
* 如果用户定义了未实现的聚合器,默认使用空聚合器,业务容错避免抛出异常影响其他的表达式执行
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2021 Terminus, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cloud.erda.analyzer.runtime.expression.functions.aggregators;

import cloud.erda.analyzer.common.utils.ConvertUtils;

/**
* @author liuhaoyang
* @date 2021/12/27 10:59
*/
public class RatepsFunctionAggregator implements FunctionAggregator {

private final long interval;
private double sum;

public RatepsFunctionAggregator(long window) {
sum = 0d;
interval = window * 60;
}

@Override
public String aggregator() {
return FunctionAggregatorDefine.RATEPS;
}

@Override
public Object value() {
return sum / interval;
}

@Override
public void apply(Object value) {
Double d = ConvertUtils.toDouble(value);
if (d != null) {
sum += d;
}
}

@Override
public void merge(FunctionAggregator other) {
if (other instanceof RatepsFunctionAggregator) {
this.sum += ((RatepsFunctionAggregator) other).sum;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,6 @@ public Accumulator createAccumulator() {

@Override
public Accumulator add(KeyedMetricEvent value, Accumulator accumulator) {
/**
* 执行表达式中 function 的 aggregator
*/
for (ExpressionFunction function : value.getExpression().getFunctions()) {
if (!ExpressionFunctionTrigger.applied.equals(function.getTrigger())) {
continue;
}
FunctionAggregator aggregator = accumulator.getMetricAggregator(function);
if (!(aggregator instanceof AppliedFunctionAggregator)) {
aggregator = new AppliedFunctionAggregator(aggregator);
}
Object v = getFieldValue(value.getMetric(), value.getMetadataId(), function);
aggregator.apply(v);
accumulator.setFunction(aggregator, function);
}

if (!accumulator.initialized()) {
accumulator.setMetadataId(value.getMetadataId());
Expand All @@ -82,6 +67,22 @@ public Accumulator add(KeyedMetricEvent value, Accumulator accumulator) {
accumulator.setMetric(value.getMetric());
accumulator.setKey(value.getKey());

/**
* 执行表达式中 function 的 aggregator
*/
for (ExpressionFunction function : value.getExpression().getFunctions()) {
if (!ExpressionFunctionTrigger.applied.equals(function.getTrigger())) {
continue;
}
FunctionAggregator aggregator = accumulator.getMetricAggregator(function);
if (!(aggregator instanceof AppliedFunctionAggregator)) {
aggregator = new AppliedFunctionAggregator(aggregator);
}
Object v = getFieldValue(value.getMetric(), value.getMetadataId(), function);
aggregator.apply(v);
accumulator.setFunction(aggregator, function);
}

return accumulator;
}

Expand Down

0 comments on commit ed0dd61

Please sign in to comment.