diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/functions/AlertTargetToEventBoxMapFunction.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/functions/AlertTargetToEventBoxMapFunction.java index ecfa554..4048e6d 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/functions/AlertTargetToEventBoxMapFunction.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/functions/AlertTargetToEventBoxMapFunction.java @@ -16,10 +16,9 @@ import cloud.erda.analyzer.alert.models.RenderedAlertEvent; import cloud.erda.analyzer.alert.models.eventbox.*; -import cloud.erda.analyzer.alert.models.eventbox.*; import cloud.erda.analyzer.common.constant.AlertConstants; import cloud.erda.analyzer.common.constant.Constants; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import org.apache.flink.api.common.functions.MapFunction; import java.util.Map; @@ -62,7 +61,7 @@ public EventBoxRequest map(RenderedAlertEvent value) throws Exception { eventBoxChannel.setName(groupType); eventBoxChannel.setTemplate(value.getContent()); // todo 在 3.19 兼容tag使用,eventbox Channel 需要重构,tag 改为map比较好 - eventBoxChannel.setTag(GsonUtil.toJson(value.getMetricEvent().getTags())); + eventBoxChannel.setTag(JsonMapperUtils.toStrings(value.getMetricEvent().getTags())); // todo 只在3.11兼容邮件模板用 if (groupType.equals("email")) { eventBoxChannel.setType("markdown"); diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/functions/NotifyEventTemplateRenderFunction.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/functions/NotifyEventTemplateRenderFunction.java index 436c234..ebd24c4 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/functions/NotifyEventTemplateRenderFunction.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/functions/NotifyEventTemplateRenderFunction.java @@ -21,10 +21,9 @@ import cloud.erda.analyzer.alert.templates.TemplateRenderer; import cloud.erda.analyzer.alert.templates.formatters.TemplateFormatter; import cloud.erda.analyzer.alert.templates.formatters.TemplateFormatterFactory; -import cloud.erda.analyzer.alert.models.*; import cloud.erda.analyzer.common.constant.AlertConstants; import cloud.erda.analyzer.common.utils.DateUtils; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import org.apache.flink.api.common.functions.MapFunction; import java.util.HashMap; @@ -48,7 +47,7 @@ public RenderedNotifyEvent map(NotifyEvent notifyEvent) throws Exception { result.setTemplateTarget(notifyEvent.getNotifyTemplate().getTemplate().getTarget()); //将fields转换为map Map templateContext = TemplateContext.fromMetric(result.getMetricEvent()); - Map attribute = GsonUtil.toMap(notifyEvent.getNotify().getAttribute(), String.class, Object.class); + Map attribute = JsonMapperUtils.toObjectValueMap(notifyEvent.getNotify().getAttribute()); if (attribute != null) { templateContext.putAll(attribute); } diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/functions/NotifyTargetToEventBoxMapFunction.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/functions/NotifyTargetToEventBoxMapFunction.java index 1a5432a..9a576a9 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/functions/NotifyTargetToEventBoxMapFunction.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/functions/NotifyTargetToEventBoxMapFunction.java @@ -16,10 +16,9 @@ import cloud.erda.analyzer.alert.models.RenderedNotifyEvent; import cloud.erda.analyzer.alert.models.eventbox.*; -import cloud.erda.analyzer.alert.models.eventbox.*; import cloud.erda.analyzer.common.constant.AlertConstants; import cloud.erda.analyzer.common.constant.Constants; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import org.apache.flink.api.common.functions.MapFunction; import java.util.Map; @@ -29,12 +28,12 @@ public class NotifyTargetToEventBoxMapFunction implements MapFunction tags = renderedNotifyEvent.getMetricEvent().getTags(); + Map tags = renderedNotifyEvent.getMetricEvent().getTags(); EventBoxContent content = new EventBoxContent(); // content.setSourceType(renderedNotifyEvent.getScopeType()); // content.setSourceId(renderedNotifyEvent.getScopeId()); @@ -44,7 +43,7 @@ public EventBoxRequest map(RenderedNotifyEvent renderedNotifyEvent) throws Excep EventBoxChannel eventBoxChannel = new EventBoxChannel(); eventBoxChannel.setName(groupType); eventBoxChannel.setTemplate(renderedNotifyEvent.getContent()); - eventBoxChannel.setTag(GsonUtil.toJson(renderedNotifyEvent.getMetricEvent().getTags())); + eventBoxChannel.setTag(JsonMapperUtils.toStrings(renderedNotifyEvent.getMetricEvent().getTags())); if (groupType.equals("email")) { eventBoxChannel.setType("markdown"); } diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/AlertNotifyTarget.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/AlertNotifyTarget.java index 42472d2..658eec1 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/AlertNotifyTarget.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/AlertNotifyTarget.java @@ -14,7 +14,7 @@ package cloud.erda.analyzer.alert.models; -import com.google.gson.annotations.SerializedName; +import com.fasterxml.jackson.annotation.JsonSetter; import lombok.Data; /** @@ -36,13 +36,13 @@ public class AlertNotifyTarget { */ private String type; - @SerializedName(value = "group_id") + @JsonSetter("group_id") private String groupId; - @SerializedName(value = "group_type") + @JsonSetter("group_type") private String groupType; - @SerializedName(value = "dingding_url") + @JsonSetter("dingding_url") private String dingdingUrl; private String level; diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/NotifyTarget.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/NotifyTarget.java index 0d8d6a5..63c26d1 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/NotifyTarget.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/NotifyTarget.java @@ -14,14 +14,16 @@ package cloud.erda.analyzer.alert.models; -import com.google.gson.annotations.SerializedName; +import com.fasterxml.jackson.annotation.JsonSetter; import lombok.Data; @Data public class NotifyTarget { - @SerializedName(value = "group_id") + @JsonSetter("group_id") private String groupId; - @SerializedName(value = "channels") + + @JsonSetter("channels") private String[] channels; + private String dingdingUrl; } diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/NotifyTemplate.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/NotifyTemplate.java index 6998303..bb50047 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/NotifyTemplate.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/NotifyTemplate.java @@ -14,17 +14,22 @@ package cloud.erda.analyzer.alert.models; -import com.google.gson.annotations.SerializedName; +import com.fasterxml.jackson.annotation.JsonSetter; import lombok.Data; import lombok.extern.slf4j.Slf4j; @Slf4j @Data public class NotifyTemplate { - @SerializedName("id") + + @JsonSetter("id") private String notifyId; //模版id + private Metadata metadata; + private Behavior behavior; + private Template[] templates; + private long processingTime; } diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/eventbox/EventBoxDingDingLabel.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/eventbox/EventBoxDingDingLabel.java index 1ee7dce..94e8650 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/eventbox/EventBoxDingDingLabel.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/eventbox/EventBoxDingDingLabel.java @@ -14,7 +14,7 @@ package cloud.erda.analyzer.alert.models.eventbox; -import com.google.gson.annotations.SerializedName; +import com.fasterxml.jackson.annotation.JsonGetter; import lombok.Data; /** @@ -24,12 +24,21 @@ @Data public class EventBoxDingDingLabel { - @SerializedName(value = "DINGDING") - private String[] dingding; - @SerializedName(value = "MARKDOWN") private EventBoxDingDingLabelMarkdown markdown; + private String[] dingding; + + @JsonGetter("MARKDOWN") + public EventBoxDingDingLabelMarkdown getMarkdown() { + return markdown; + } + + @JsonGetter("DINGDING") + public String[] getDingding() { + return dingding; + } + public static EventBoxDingDingLabel label(String title, String... dingding) { EventBoxDingDingLabel eventBoxDingDingLabel = new EventBoxDingDingLabel(); eventBoxDingDingLabel.setDingding(dingding); diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/eventbox/EventBoxNotifyGroupLabel.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/eventbox/EventBoxNotifyGroupLabel.java index d93464e..c9cab54 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/eventbox/EventBoxNotifyGroupLabel.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/models/eventbox/EventBoxNotifyGroupLabel.java @@ -14,7 +14,8 @@ package cloud.erda.analyzer.alert.models.eventbox; -import com.google.gson.annotations.SerializedName; +import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonSetter; import lombok.Data; /** @@ -24,6 +25,11 @@ @Data public class EventBoxNotifyGroupLabel { - @SerializedName(value = "GROUP") + @JsonSetter("GROUP") private long group; + + @JsonGetter("GROUP") + public long getGroup() { + return group; + } } diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sinks/EventBoxSink.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sinks/EventBoxSink.java index 58f6b0e..6fb7f34 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sinks/EventBoxSink.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sinks/EventBoxSink.java @@ -16,7 +16,7 @@ import cloud.erda.analyzer.alert.models.eventbox.EventBoxRequest; import cloud.erda.analyzer.common.constant.Constants; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import cloud.erda.analyzer.common.utils.http.ContentTypes; import cloud.erda.analyzer.common.utils.http.HttpMethods; import cloud.erda.analyzer.common.utils.http.HttpRequestDTO; @@ -57,7 +57,7 @@ public void invoke(EventBoxRequest value, Context context) throws Exception { if (value == null) { return; } - String body = GsonUtil.toJson(value); + String body = JsonMapperUtils.toStrings(value); HttpRequestDTO httpRequestDTO = new HttpRequestDTO(); httpRequestDTO.setBody(body); httpRequestDTO.setContentType(ContentTypes.APPLICATION_JSON); diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sinks/TicketSink.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sinks/TicketSink.java index 8440b90..349e1c8 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sinks/TicketSink.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sinks/TicketSink.java @@ -18,7 +18,7 @@ import cloud.erda.analyzer.alert.models.AlertTrigger; import cloud.erda.analyzer.common.constant.AlertConstants; import cloud.erda.analyzer.common.constant.Constants; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import cloud.erda.analyzer.common.utils.http.ContentTypes; import cloud.erda.analyzer.common.utils.http.HttpAsyncRequestService; import cloud.erda.analyzer.common.utils.http.HttpMethods; @@ -58,7 +58,7 @@ public void open(Configuration parameters) throws Exception { @Override public void invoke(Ticket value, Context context) throws Exception { - String body = GsonUtil.toJson(value); + String body = JsonMapperUtils.toStrings(value); HttpRequestDTO httpRequestDTO = new HttpRequestDTO(); String trigger = value.getLabel().getOrDefault(AlertConstants.TRIGGER, AlertTrigger.alert.name()); if (AlertTrigger.alert.equals(AlertTrigger.valueOf(trigger))) { diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/AllNotifyTemplates.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/AllNotifyTemplates.java index 69a1146..af96738 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/AllNotifyTemplates.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/AllNotifyTemplates.java @@ -15,7 +15,7 @@ package cloud.erda.analyzer.alert.sources; import cloud.erda.analyzer.alert.models.NotifyTemplate; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import lombok.extern.slf4j.Slf4j; import lombok.val; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -31,7 +31,7 @@ import java.util.Map; @Slf4j -public class AllNotifyTemplates implements SourceFunction{ +public class AllNotifyTemplates implements SourceFunction { private String monitorAddr; private CloseableHttpClient httpclient; private long httpInterval = 60000; @@ -50,10 +50,10 @@ public ArrayList GetSysTemplateList() throws IOException { CloseableHttpResponse closeableHttpResponse = httpclient.execute(httpGet); try { if (closeableHttpResponse.getStatusLine().getStatusCode() == HttpURLConnection.HTTP_OK) { - String str = EntityUtils.toString(closeableHttpResponse.getEntity()); - Map templateMap = GsonUtil.toMap(str, String.class, Object.class); + byte[] data = EntityUtils.toByteArray(closeableHttpResponse.getEntity()); + Map templateMap = JsonMapperUtils.toObjectValueMap(data); Object templateInfo = templateMap.get("data"); - templateArr = GsonUtil.toArrayList(GsonUtil.toJson(templateInfo),NotifyTemplate.class); + templateArr = JsonMapperUtils.toArrayList(JsonMapperUtils.toBytes(templateInfo), NotifyTemplate.class); } } finally { closeableHttpResponse.close(); @@ -78,7 +78,7 @@ public void run(SourceContext sourceContext) throws Exception { @Override public void cancel() { - if(this.httpclient != null) { + if (this.httpclient != null) { try { this.httpclient.close(); } catch (Exception e) { diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/NotifyReader.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/NotifyReader.java index a1ee0ba..9c11abd 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/NotifyReader.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/NotifyReader.java @@ -18,7 +18,7 @@ import cloud.erda.analyzer.alert.models.AlertNotify; import cloud.erda.analyzer.alert.models.AlertNotifyTarget; import cloud.erda.analyzer.common.constant.AlertConstants; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import cloud.erda.analyzer.common.utils.StringUtil; import cloud.erda.analyzer.runtime.sources.DataRowReader; import lombok.extern.slf4j.Slf4j; @@ -40,7 +40,7 @@ public AlertNotify read(ResultSet resultSet) throws Exception { notify.setAlertId(resultSet.getString("alert_id")); notify.setEnable(resultSet.getBoolean("enable")); String notifyTargetData = resultSet.getString("notify_target"); - AlertNotifyTarget notifyTarget = GsonUtil.toObject(notifyTargetData, AlertNotifyTarget.class); + AlertNotifyTarget notifyTarget = JsonMapperUtils.toObject(notifyTargetData, AlertNotifyTarget.class); if (AlertConstants.ALERT_NOTIFY_TYPE_NOTIFY_GROUP.equals(notifyTarget.getType())) { notifyTarget.setGroupTypes(notifyTarget.getGroupType().split(",")); } @@ -58,7 +58,9 @@ public AlertNotify read(ResultSet resultSet) throws Exception { notify.setSilence(resultSet.getLong("silence")); notify.setSilencePolicy(resultSet.getString("silence_policy")); notify.setProcessingTime(System.currentTimeMillis()); - log.info("Read alert notify {} data: {}", notify.getId(), GsonUtil.toJson(notify)); + if (log.isInfoEnabled()) { + log.info("Read alert notify {} data: {}", notify.getId(), JsonMapperUtils.toStrings(notify)); + } return notify; } catch (Exception ex) { log.warn("Read or deserialize Notify Metadata error.", ex); diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/NotifyTemplateReader.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/NotifyTemplateReader.java index 4613e8f..37ff213 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/NotifyTemplateReader.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/NotifyTemplateReader.java @@ -16,7 +16,7 @@ import cloud.erda.analyzer.alert.models.AlertNotifyTemplate; import cloud.erda.analyzer.alert.models.AlertTrigger; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import cloud.erda.analyzer.runtime.sources.DataRowReader; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -58,14 +58,16 @@ public AlertNotifyTemplate read(ResultSet resultSet) throws Exception { if (StringUtils.isEmpty(formatString)) { formats = new HashMap<>(); } else { - formats = GsonUtil.toMap(formatString, String.class, String.class); + formats = JsonMapperUtils.toStringValueMap(formatString); } notifyTemplate.setFormats(formats); checkNotNull(notifyTemplate.getTitle(), "Title cannot be null"); checkNotNull(notifyTemplate.getTemplate(), "Template cannot be null"); notifyTemplate.setProcessingTime(System.currentTimeMillis()); notifyTemplate.setVariable(templateVariable); - log.info("Read alert notify template {} data: {}",notifyTemplate.getId(), GsonUtil.toJson(notifyTemplate)); + if (log.isInfoEnabled()) { + log.info("Read alert notify template {} data: {}", notifyTemplate.getId(), JsonMapperUtils.toStrings(notifyTemplate)); + } return notifyTemplate; } catch (Exception ex) { log.warn("Read or deserialize id {} Custom AlertNotifyTemplate error.", resultSet.getLong("id"), ex); diff --git a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/SpotNotifyReader.java b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/SpotNotifyReader.java index 2e1ae4c..df9024c 100644 --- a/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/SpotNotifyReader.java +++ b/analyzer-alert/src/main/java/cloud/erda/analyzer/alert/sources/SpotNotifyReader.java @@ -16,7 +16,7 @@ import cloud.erda.analyzer.alert.models.NotifyTarget; import cloud.erda.analyzer.alert.models.Notify; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import cloud.erda.analyzer.runtime.sources.DataRowReader; import java.sql.ResultSet; @@ -29,9 +29,9 @@ public Notify read(ResultSet resultSet) throws Exception { Notify notify = new Notify(); Long id = resultSet.getLong("id"); String target = resultSet.getString("target"); - NotifyTarget notifyTarget = GsonUtil.toObject(target, NotifyTarget.class); + NotifyTarget notifyTarget = JsonMapperUtils.toObject(target, NotifyTarget.class); String templateIds = resultSet.getString("notify_id"); - ArrayList templateIDArr = GsonUtil.toArrayList(templateIds,String.class); + ArrayList templateIDArr = JsonMapperUtils.toArrayList(templateIds, String.class); String scopeType = resultSet.getString("scope"); String scopeId = resultSet.getString("scope_id"); String attribute = resultSet.getString("attributes"); diff --git a/analyzer-alert/src/main/java/schemas/RecordSchema.java b/analyzer-alert/src/main/java/schemas/RecordSchema.java index 9e22da3..e892514 100644 --- a/analyzer-alert/src/main/java/schemas/RecordSchema.java +++ b/analyzer-alert/src/main/java/schemas/RecordSchema.java @@ -1,6 +1,6 @@ package schemas; -import com.google.gson.Gson; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -9,12 +9,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.Charset; public class RecordSchema implements DeserializationSchema, SerializationSchema { private final static Logger logger = LoggerFactory.getLogger(RecordSchema.class); - private static final Gson gson = new Gson(); private final TypeInformation type; @@ -26,11 +24,10 @@ public RecordSchema(Class type) { @Override public T deserialize(byte[] bytes) throws IOException { - String input = new String(bytes); try { - return gson.fromJson(input, this.type.getTypeClass()); + return JsonMapperUtils.toObject(bytes, this.type.getTypeClass()); } catch (Throwable throwable) { - logger.error("Deserialize record fail. \nSource : {} \n", input, throwable); + logger.error("Deserialize record fail. \nSource : {} \n", new String(bytes), throwable); return null; } } @@ -42,7 +39,12 @@ public boolean isEndOfStream(T record) { @Override public byte[] serialize(T record) { - return gson.toJson(record).getBytes(Charset.forName("UTF-8")); + try { + return JsonMapperUtils.toBytes(record); + } catch (IOException throwable) { + logger.error("Serialize record fail. \n", throwable); + return null; + } } @Override diff --git a/analyzer-common/pom.xml b/analyzer-common/pom.xml index ad8b5e4..effac70 100644 --- a/analyzer-common/pom.xml +++ b/analyzer-common/pom.xml @@ -27,31 +27,21 @@ 4.13 test + + com.fasterxml.jackson.core + jackson-databind + 2.9.6 + com.google.guava guava 30.0-jre - - me.atlis - decibelinsight-uasparser - 1.0.12 - - - com.qiniu - ip17mon - 0.2.2 - javax.validation validation-api 2.0.1.Final - - com.jayway.jsonpath - json-path - 2.4.0 - org.apache.httpcomponents httpclient diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/models/MetricEvent.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/models/MetricEvent.java index 9552a63..ac4f9ec 100644 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/models/MetricEvent.java +++ b/analyzer-common/src/main/java/cloud/erda/analyzer/common/models/MetricEvent.java @@ -14,6 +14,7 @@ package cloud.erda.analyzer.common.models; +import cloud.erda.analyzer.common.utils.StringBuilderUtils; import lombok.Data; import lombok.val; @@ -62,7 +63,7 @@ public MetricEvent addField(String key, Object val) { } public String toString() { - StringBuilder sb = new StringBuilder(); + StringBuilder sb = StringBuilderUtils.getCachedStringBuilder(); sb.append(timestamp).append(" [Name=").append(name).append("] Tags["); for (Map.Entry tag : tags.entrySet()) { sb.append(tag.getKey()).append("=").append(tag.getValue()).append(" "); diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/CommonSchema.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/CommonSchema.java index b6ce6fa..01d87db 100644 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/CommonSchema.java +++ b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/CommonSchema.java @@ -1,6 +1,6 @@ package cloud.erda.analyzer.common.schemas; -import com.google.gson.Gson; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -9,12 +9,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.Charset; public class CommonSchema implements DeserializationSchema, SerializationSchema { private final static Logger logger = LoggerFactory.getLogger(CommonSchema.class); - private static final Gson gson = new Gson(); private final TypeInformation type; @@ -23,14 +21,12 @@ public CommonSchema(Class type) { this.type = TypeInformation.of(type); } - @Override public T deserialize(byte[] bytes) throws IOException { - String input = new String(bytes); try { - return gson.fromJson(input, this.type.getTypeClass()); + return JsonMapperUtils.toObject(bytes, type.getTypeClass()); } catch (Throwable throwable) { - logger.error("Deserialize record fail. \nSource : {} \n", input, throwable); + logger.error("Deserialize record fail. \n", throwable); return null; } } @@ -42,7 +38,12 @@ public boolean isEndOfStream(T record) { @Override public byte[] serialize(T record) { - return gson.toJson(record).getBytes(Charset.forName("UTF-8")); + try { + return JsonMapperUtils.toBytes(record); + } catch (IOException e) { + logger.error("Serialize record fail. \n", e); + return null; + } } @Override diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/LogEventSchema.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/LogEventSchema.java deleted file mode 100644 index 85a30d2..0000000 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/LogEventSchema.java +++ /dev/null @@ -1,125 +0,0 @@ -// Copyright (c) 2021 Terminus, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cloud.erda.analyzer.common.schemas; - -import cloud.erda.analyzer.common.constant.LogConstant; -import cloud.erda.analyzer.common.models.LogEvent; -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public class LogEventSchema implements SerializationSchema, DeserializationSchema { - - private static final Logger logger = LoggerFactory.getLogger(LogEventSchema.class); - - private static final String DEFAULT_LOG_STREAM = "stdout"; - private static final String DEFAULT_LOG_LEVEL = "INFO"; - - private static final Gson GSON = new Gson(); - - private String[] idKeys; - - public LogEventSchema(String idKeys) { - this.idKeys = idKeys.split(","); - } - - @Override - public LogEvent deserialize(byte[] bytes) { - LogEvent logEvent; - try { - logEvent = GSON.fromJson(new String(bytes), LogEvent.class); - - Map tags = logEvent.getTags(); - if (tags == null) { - tags = Maps.newHashMap(); - logEvent.setTags(tags); - } - - String level = tags.get(LogConstant.LEVEL_KEY); - - if (level == null) { - level = DEFAULT_LOG_LEVEL; - } else { - level = level.toUpperCase(); - } - - tags.put(LogConstant.LEVEL_KEY, level); - - for (String idKey : idKeys) { - String id = tags.get(idKey); - if (id != null) { - logEvent.setId(id); - break; - } - } - - if (StringUtils.isBlank(logEvent.getStream())) { - logEvent.setStream(DEFAULT_LOG_STREAM); - } - - if (logEvent.getOffset() == null) { - logEvent.setOffset(0L); - } - - return logEvent; - } catch (Exception e) { - logger.error("Deserialize LogEvent fail", e); - return null; - } - } - - private String getRequestId(Object extend) { - if (extend == null) { - return null; - } - - String[] arr = extend.toString().trim().split(","); - if (arr.length < 2) { - return null; - } - return arr[1]; - } - - private void addKeysToMap(Map from, Map to, String... keys) { - for (String key : keys) { - Object value = from.get(key); - if (value != null) { - to.put(key, value.toString()); - } - } - } - - @Override - public boolean isEndOfStream(LogEvent logEvent) { - return false; - } - - @Override - public byte[] serialize(LogEvent logEvent) { - return GSON.toJson(logEvent).getBytes(); - } - - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(LogEvent.class); - } -} diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/MetricEventSchema.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/MetricEventSchema.java index 7c051f5..f9917cb 100644 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/MetricEventSchema.java +++ b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/MetricEventSchema.java @@ -15,7 +15,7 @@ package cloud.erda.analyzer.common.schemas; import cloud.erda.analyzer.common.models.MetricEvent; -import com.google.gson.Gson; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -24,22 +24,18 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; @Slf4j public class MetricEventSchema implements DeserializationSchema, SerializationSchema { private final static Logger logger = LoggerFactory.getLogger(MetricEventSchema.class); - private static final Gson gson = new Gson(); @Override public MetricEvent deserialize(byte[] bytes) throws IOException { - String input = new String(bytes); try { - return gson.fromJson(input, MetricEvent.class); + return JsonMapperUtils.toObject(bytes, MetricEvent.class); } catch (Throwable throwable) { - logger.error("Deserialize metric event fail. \nSource : {} \n", input, throwable); + logger.error("Deserialize metric event fail. \nSource : {} \n", new String(bytes), throwable); return null; } } @@ -52,7 +48,7 @@ public boolean isEndOfStream(MetricEvent metricEvent) { @Override public byte[] serialize(MetricEvent metricEvent) { try { - return gson.toJson(metricEvent).getBytes(StandardCharsets.UTF_8); + return JsonMapperUtils.toBytes(metricEvent); } catch (Exception exception) { logger.error("Serialize metric event fail. {}", metricEvent.toString(), exception); return null; diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/MetricEventSerializeFunction.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/MetricEventSerializeFunction.java index 6da90be..cca4731 100644 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/MetricEventSerializeFunction.java +++ b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/MetricEventSerializeFunction.java @@ -17,13 +17,11 @@ package cloud.erda.analyzer.common.schemas; import cloud.erda.analyzer.common.models.MetricEvent; -import com.google.gson.Gson; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; -import java.nio.charset.StandardCharsets; - /** * @author liuhaoyang * @date 2021/10/15 17:47 @@ -31,12 +29,10 @@ @Slf4j public class MetricEventSerializeFunction implements FlatMapFunction { - private static final Gson gson = new Gson(); - @Override public void flatMap(MetricEvent metricEvent, Collector collector) throws Exception { try { - collector.collect(gson.toJson(metricEvent)); + collector.collect(JsonMapperUtils.toStrings(metricEvent)); } catch (Exception exception) { log.error("Serialize metric event fail. {}", metricEvent.toString(), exception); } diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/RequestEventSchema.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/RequestEventSchema.java deleted file mode 100644 index cfaefb2..0000000 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/RequestEventSchema.java +++ /dev/null @@ -1,408 +0,0 @@ -// Copyright (c) 2021 Terminus, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cloud.erda.analyzer.common.schemas; - -import cloud.erda.analyzer.common.models.MetricEvent; -import cloud.erda.analyzer.common.schemas.requests.*; -import cloud.erda.analyzer.common.utils.NumberParser; -import com.google.gson.Gson; -import com.google.gson.JsonSyntaxException; -import cloud.erda.analyzer.common.schemas.requests.*; -import lombok.val; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import qiniu.ip17mon.LocationInfo; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; - -public class RequestEventSchema implements DeserializationSchema { - - private final static Logger logger = LoggerFactory.getLogger(RequestEventSchema.class); - private final static Gson gson = new Gson(); - private final static long MILLISECOND = 1; - private final static long NANOSECOND = 1000 * 1000; - - private static class IngoreException extends IOException { - } - - @Override - public MetricEvent deserialize(byte[] bytes) throws IOException { - if (bytes == null || bytes.length <= 0) { - logger.error("event is empty"); - return null; - } - String message = new String(bytes, StandardCharsets.UTF_8); - String[] parts = message.split("\\,", 5); - if (parts.length != 5) { - logger.error("invalid event", message); - return null; - } - try { - val metric = new MetricEvent(); - val data = FormParams.parse(parts[4]); - if (data == null) return null; - String name = data.get("t"); - metric.setTimestamp(Long.parseLong(data.get("date")) * NANOSECOND); - // set common tags - metric.addTag("tk", parts[0]); - metric.addTag("cid", parts[1]); - metric.addTag("uid", parts[2]); - metric.addTag("vid", data.get("vid")); - metric.addTag("ip", parts[3]); - metric.addTag("host", data.get("dh")); - metric.addTag("doc_path", PathUtils.getRoute(data.get("dp"))); - switch (name) { - case "req": - case "ajax": - case "request": - return toRequestMetricEvent(data, metric); - case "timing": - return toTimingMetricEvent(data, metric); - case "error": - return toErrorMetricEvent(data, metric); - case "device": - return toDeviceMetricEvent(data, metric); - case "browser": - return toBrowserMetricEvent(data, metric); - case "document": - return toDocumentMetricEvent(data, metric); - case "script": - return toScriptMetricEvent(data, metric); - case "event": - return toEventMetricEvent(data, metric); - default: - logger.error(String.format("unknown t=%s in data", name)); - return null; - } - } catch (IngoreException e) { - return null; - } catch (IOException e) { - logger.error("Deserialize browser insight event fail. \n " + e.getMessage()); - return null; - } catch (Exception e) { - logger.error("Deserialize browser insight event fail. \n " + e.getMessage()); - return null; - } - } - - private MetricEvent toRequestMetricEvent(Map data, MetricEvent metric) throws Exception { - metric.setName("ta_req"); - Map fields = metric.getFields(); - appendMobileInfoIfNeed(data, metric); - fields.put("tt", NumberParser.parseLong(data.get("tt"), 0) * MILLISECOND); - fields.put("req", NumberParser.parseDouble(data.get("req"), 0)); - fields.put("res", NumberParser.parseDouble(data.get("res"), 0)); - int status = NumberParser.parseInt(data.get("st"), 0); - fields.put("errors", status >= 400 ? 1 : 0); - fields.put("status", status); - - String url = data.get("url"); - metric.addTag("url", url); - metric.addTag("req_path", PathUtils.getRoute(url)); - metric.addTag("status_code", data.get("st")); - metric.addTag("method", data.get("me")); - return metric; - } - - private MetricEvent toTimingMetricEvent(Map data, MetricEvent metric) throws Exception { - metric.setName("ta_timing"); - Map fields = metric.getFields(); - boolean isMobile = appendMobileInfoIfNeed(data, metric); - if (isMobile) { - long nt = NumberParser.parseLong(data.get("nt"), 0); - fields.put("plt", nt * MILLISECOND); - } else { - UserAgent ua = UserAgent.parse(data.get("ua")); - metric.addTag("browser", ua.browser); - metric.addTag("browser_version", ua.browserVersion); - metric.addTag("os", ua.os); - metric.addTag("osv", ua.osVersion); - metric.addTag("device", ua.device); - - String timingStr = data.get("pt"); - long plt, act, dns, tcp, srt, net; - if (timingStr == null || timingStr.length() == 0) { - NavigationTiming nt = NavigationTiming.parse(data.get("nt")); - plt = putPositiveValue(fields, "plt", (nt.loadTime + nt.readyStart) * MILLISECOND); // ? loadTime - putPositiveValue(fields, "rrt", nt.redirectTime * MILLISECOND); - putPositiveValue(fields, "put", nt.unloadEventTime * MILLISECOND); - act = putPositiveValue(fields, "act", nt.appCacheTime * MILLISECOND); - dns = putPositiveValue(fields, "dns", nt.lookupDomainTime * MILLISECOND); - tcp = putPositiveValue(fields, "tcp", nt.connectTime * MILLISECOND); - putPositiveValue(fields, "rqt", (nt.requestTime - nt.responseTime) * MILLISECOND); // ? requestTime - putPositiveValue(fields, "rpt", nt.responseTime * MILLISECOND); - srt = putPositiveValue(fields, "srt", nt.requestTime * MILLISECOND); // ? requestTime + responseTime - putPositiveValue(fields, "dit", nt.initDomTreeTime * MILLISECOND); - putPositiveValue(fields, "drt", nt.domReadyTime * MILLISECOND); - putPositiveValue(fields, "clt", nt.loadEventTime * MILLISECOND); - putPositiveValue(fields, "set", nt.scriptExecuteTime * MILLISECOND); - putPositiveValue(fields, "wst", ( - nt.redirectTime + nt.appCacheTime + nt.lookupDomainTime + nt.connectTime + nt.requestTime - nt.responseTime - // ? redirectTime + appCacheTime + lookupDomainTime + connectTime - ) * MILLISECOND); - putPositiveValue(fields, "fst", ( - (nt.loadTime + nt.readyStart) - (nt.initDomTreeTime + nt.domReadyTime + nt.loadEventTime) - // ? loadTime - (initDomTreeTime + domReadyTime + loadEventTime) - ) * MILLISECOND); - putPositiveValue(fields, "pct", ( - (nt.loadTime + nt.readyStart) - (nt.domReadyTime + nt.loadEventTime) - // ? loadTime - (domReadyTime + loadEventTime) - ) * MILLISECOND); - putPositiveValue(fields, "rct", ( - (nt.loadTime + nt.readyStart) - nt.loadEventTime - // ? loadTime - loadEventTime - ) * MILLISECOND); - } else { - PerformanceTiming pt = PerformanceTiming.parse(timingStr); - plt = putPositiveValue(fields, "plt", pt.loadEventEnd - pt.navigationStart); - putPositiveValue(fields, "rrt", pt.redirectEnd - pt.redirectStart); - putPositiveValue(fields, "put", pt.unloadEventEnd - pt.unloadEventStart); - act = putPositiveValue(fields, "act", pt.domainLookupStart - pt.fetchStart); - dns = putPositiveValue(fields, "dns", pt.domainLookupEnd - pt.domainLookupStart); - tcp = putPositiveValue(fields, "tcp", pt.connectEnd - pt.connectStart); - putPositiveValue(fields, "rqt", pt.responseStart - pt.requestStart); - putPositiveValue(fields, "rpt", pt.responseEnd - pt.responseStart); - srt = putPositiveValue(fields, "srt", pt.responseEnd - pt.requestStart); - putPositiveValue(fields, "dit", pt.domInteractive - pt.responseEnd); - putPositiveValue(fields, "drt", pt.domComplete - pt.domInteractive); - putPositiveValue(fields, "clt", pt.loadEventEnd - pt.loadEventStart); - putPositiveValue(fields, "set", pt.domContentLoadedEventEnd - pt.domContentLoadedEventStart); - putPositiveValue(fields, "wst", pt.responseStart - pt.navigationStart); - putPositiveValue(fields, "fst", pt.firstPaintTime); - putPositiveValue(fields, "pct", - (pt.loadEventEnd - pt.fetchStart) + (pt.fetchStart - pt.navigationStart) - - ((pt.domComplete - pt.domInteractive) + (pt.loadEventEnd - pt.loadEventStart))); - putPositiveValue(fields, "rct", - (pt.loadEventEnd - pt.fetchStart) + (pt.fetchStart - pt.navigationStart) - - (pt.loadEventEnd - pt.loadEventStart)); - } - net = act + tcp + dns; - putPositiveValue(fields, "net", net); - putPositiveValue(fields, "prt", plt - srt - net); - - List rtList = null; - try { - rtList = ResourceTiming.parseToList(data.get("rt")); - } catch (JsonSyntaxException e) { - logger.error("invalid timing.rt", data.get("rt")); - return null; - } - putPositiveValue(fields, "rlt", ResourceTiming.resourceTiming(rtList) * MILLISECOND); - putPositiveValue(fields, "rdc", ResourceTiming.resourceDnsCount(rtList)); - - if (plt > 8000 * MILLISECOND) { - metric.addTag("slow", "true"); - } - } - - metric.addTag("country", "中国"); - metric.addTag("province", "局域网"); - metric.addTag("city", "局域网"); - // metric.addTag("isp", ""); - String ip = metric.getTags().get("ip"); - if (ip != null && ip.length() > 0 && !ip.contains(":")) { - LocationInfo info = IPDB.find(ip); - metric.addTag("country", info.country); - metric.addTag("province", info.state); - metric.addTag("city", info.city); - // metric.addTag("isp", info.isp); - } - return metric; - } - - private MetricEvent toErrorMetricEvent(Map data, MetricEvent metric) throws Exception { - metric.setName("ta_error"); - metric.getFields().put("count", 1); - boolean isMobile = appendMobileInfoIfNeed(data, metric); - if (!isMobile) { -// metric.addTag("source", data.get("ers")); -// metric.addTag("line_no", data.get("")); -// metric.addTag("column_no", data.get("erc")); - UserAgent ua = UserAgent.parse(data.get("ua")); - metric.addTag("browser", ua.browser); -// metric.addTag("browser_version", ua.browserVersion); -// metric.addTag("os", ua.os); -// metric.addTag("osv", ua.osVersion); -// metric.addTag("device", ua.device); - } -// metric.addTag("vid", data.get("vid")); - metric.addTag("error", data.get("erm")); -// metric.addTag("stack_trace", data.get("sta")); - return metric; - } - - private MetricEvent toDeviceMetricEvent(Map data, MetricEvent metric) throws Exception { - metric.setName("ta_device"); - metric.getFields().put("count", 1); - boolean isMobile = appendMobileInfoIfNeed(data, metric); - if (isMobile) { - metric.addTag("channel", data.get("ch")); - if ("1".equals(data.get("jb"))) { - metric.addTag("jb", "true"); - } else { - metric.addTag("jb", "false"); - } - metric.addTag("cpu", data.get("cpu")); - metric.addTag("sdk", data.get("sdk")); - metric.addTag("sd", data.get("sd")); - metric.addTag("mem", data.get("men")); - metric.addTag("rom", data.get("rom")); - } - metric.addTag("sr", data.get("sr")); - // metric.addTag("vid", data.get("vid")); - return metric; - } - - private MetricEvent toBrowserMetricEvent(Map data, MetricEvent metric) throws Exception { - metric.setName("ta_browser"); - metric.getFields().put("count", 1); - boolean isMobile = appendMobileInfoIfNeed(data, metric); - if (isMobile) { - metric.addTag("sr", data.get("sr")); - } else { - UserAgent ua = UserAgent.parse(data.get("ua")); - metric.addTag("browser", ua.browser); - metric.addTag("ce", data.get("ce")); - metric.addTag("vp", data.get("vp")); - metric.addTag("ul", data.get("ul")); - metric.addTag("sr", data.get("sr")); - metric.addTag("sd", data.get("sd")); - metric.addTag("fl", data.get("fl")); - } - return metric; - } - - private MetricEvent toDocumentMetricEvent(Map data, MetricEvent metric) throws Exception { - metric.setName("ta_document"); - metric.getFields().put("count", 1); - boolean isMobile = appendMobileInfoIfNeed(data, metric); - if (!isMobile) { - metric.addTag("ds", data.get("ds")); - metric.addTag("dr", data.get("dr")); - metric.addTag("de", data.get("de")); - metric.addTag("dk", data.get("dk")); - metric.addTag("dl", data.get("dl")); - } - metric.addTag("dt", data.get("dt")); - metric.addTag("tp", data.get("tp")); - return metric; - } - - private MetricEvent toScriptMetricEvent(Map data, MetricEvent metric) throws Exception { - metric.setName("ta_script"); - metric.getFields().put("count", 1); - boolean isMobile = appendMobileInfoIfNeed(data, metric); - if (!isMobile) { - UserAgent ua = UserAgent.parse(data.get("ua")); - metric.addTag("browser", ua.browser); - // setTag(tags, "browser_version", ua.browserVersion); - metric.addTag("os", ua.os); - // setTag(tags, "osv", ua.osVersion); - metric.addTag("device", ua.device); - } - String msg = data.get("msg"); - if (msg != null) { - String[] lines = msg.split("\\n"); - metric.addTag("error", lines[0]); - } - metric.addTag("source", data.get("source")); - metric.addTag("line_no", data.get("lineno")); - metric.addTag("column_no", data.get("colno")); - metric.addTag("message", data.get("msg")); - return metric; - } - - private MetricEvent toEventMetricEvent(Map data, MetricEvent metric) throws Exception { - metric.setName("ta_event"); - metric.getFields().put("count", 1); - boolean isMobile = appendMobileInfoIfNeed(data, metric); - metric.getFields().put("x", Long.parseLong(data.get("x"))); - metric.getFields().put("y", Long.parseLong(data.get("y"))); - metric.addTag("x", data.get("x")); - metric.addTag("y", data.get("y")); - metric.addTag("xp", data.get("xp")); - if (isMobile) { - metric.addTag("en", data.get("en")); - metric.addTag("ei", data.get("ei")); - } - return metric; - } - - private long putPositiveValue(Map fields, String key, long value) { - if (value < 0) { - throw new RuntimeException(key + " must be positive number " + value); - } - fields.put(key, value); - return value; - } - - private boolean appendMobileInfoIfNeed(Map data, MetricEvent metric) throws IngoreException { - String ua = data.get("ua"); - if (isMobile(ua)) { - // throw new IngoreException(); - metric.setName(metric.getName() + "_mobile"); - metric.addTag("type", "mobile"); - appendMobileInfo(data, metric); - return true; - } else { - metric.addTag("type", "browser"); - return false; - } - } - - public boolean isMobile(String ua) { - if (ua == null) return false; - ua = ua.toLowerCase(); - if ("ios".equals(ua) || "android".equals(ua)) { - return true; - } - return false; - } - - private void appendMobileInfo(Map data, MetricEvent metric) { - metric.addTag("ns", data.remove("ns")); - metric.addTag("av", data.remove("av")); - metric.addTag("br", data.remove("br")); - metric.addTag("gps", data.remove("gps")); - metric.addTag("osv", data.remove("osv")); - String val = data.get("osn"); - if (val == null || "".equals(val)) { - metric.addTag("os", UserAgent.OTHER); - } else { - metric.addTag("os", val); - } - val = data.get("md"); - if (val == null || "".equals(val)) { - metric.addTag("device", UserAgent.OTHER); - } else { - metric.addTag("device", val); - } - } - - @Override - public boolean isEndOfStream(MetricEvent metricEvent) { - return false; - } - - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(MetricEvent.class); - } -} diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/SpanEventSchema.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/SpanEventSchema.java index 46b3d97..70f09f9 100644 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/SpanEventSchema.java +++ b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/SpanEventSchema.java @@ -16,8 +16,7 @@ import cloud.erda.analyzer.common.models.MetricEvent; import cloud.erda.analyzer.common.models.SpanEvent; -import com.google.gson.Gson; -import lombok.val; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -26,17 +25,15 @@ import java.io.IOException; import java.math.BigDecimal; -import java.nio.charset.Charset; public class SpanEventSchema implements DeserializationSchema, SerializationSchema { private final static Logger logger = LoggerFactory.getLogger(SpanEventSchema.class); - private final static Gson gson = new Gson(); @Override public SpanEvent deserialize(byte[] bytes) throws IOException { try { - val metric = gson.fromJson(new String(bytes), MetricEvent.class); + MetricEvent metric = JsonMapperUtils.toObject(bytes, MetricEvent.class); if (metric.getName() == null || metric.getName().length() <= 0) return null; SpanEvent span = new SpanEvent(); span.setTags(metric.getTags()); @@ -65,6 +62,11 @@ public TypeInformation getProducedType() { @Override public byte[] serialize(SpanEvent spanEvent) { - return gson.toJson(spanEvent).getBytes(Charset.forName("UTF-8")); + try { + return JsonMapperUtils.toBytes(spanEvent); + } catch (Throwable e) { + logger.error("Serialize SpanEvent fail ", e); + return null; + } } } diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/FormParams.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/FormParams.java deleted file mode 100644 index 85da3a9..0000000 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/FormParams.java +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright (c) 2021 Terminus, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cloud.erda.analyzer.common.schemas.requests; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.util.HashMap; - -public class FormParams extends HashMap { - - private final static Logger logger = LoggerFactory.getLogger(FormParams.class); - - // parse("key1=val1&key2=val 2&key3=val%203") = {"key1":"val1","key2":"val 2","key3":"val 3"} - public static FormParams parse(String queryString) { - FormParams params = new FormParams(); - String[] kvs = queryString.split("\\&"); - try { - int idx = -1; - for(String item : kvs) { - idx = item.indexOf("="); - if(idx > 0 && idx < item.length() - 1) { - String key = item.substring(0, idx); // URLDecoder.decode(item.substring(0, idx);,"UTF-8"); - params.put(key, URLDecoder.decode(item.substring(idx + 1),"UTF-8")); - } - } - } catch (UnsupportedEncodingException e) { - logger.error("invalid data in message" + queryString); - return null; - } - return params; - } -} diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/IPDB.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/IPDB.java deleted file mode 100644 index 2f9a984..0000000 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/IPDB.java +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (c) 2021 Terminus, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cloud.erda.analyzer.common.schemas.requests; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import qiniu.ip17mon.LocationInfo; -import qiniu.ip17mon.Locator; -import java.io.File; -import java.io.InputStream; -import java.net.URL; - -public class IPDB implements java.io.Serializable { - private final static Logger logger = LoggerFactory.getLogger(IPDB.class); - private static Locator locator = null; - - public static LocationInfo find(String ip) { - if(locator == null) { - return new LocationInfo("中国","局域网","局域网",""); - } - return locator.find(ip); - } - - static { - try { - File file = getFile("ipdb.dat"); - if(file == null) { - InputStream is = getStream("ipdb.dat"); - if(is != null) { - locator = Locator.loadFromStreamOld(is); - is.close(); - } else { - logger.error("ipdb.dat not exist"); - } - } else { - locator = Locator.loadFromLocal(file.getPath()); - } - } catch (Exception e) { - logger.error("ipdb load failed.",e); - } - } - - private static File getFile(String name) { - File file = new File(name); - if(!file.exists()) { - URL url = IPDB.class.getClassLoader().getResource(name); - if(url != null) { - file = new File(url.getFile()); - if(file.exists()) { - return file; - } - } - file = new File("/mnt/mesos/sandbox/" + name); - if(file.exists()) { - return file; - } - return null; - } - return file; - } - - private static InputStream getStream(String name) { - return IPDB.class.getClassLoader().getResourceAsStream(name); - } -} diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/NavigationTiming.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/NavigationTiming.java deleted file mode 100644 index 8935d90..0000000 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/NavigationTiming.java +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright (c) 2021 Terminus, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cloud.erda.analyzer.common.schemas.requests; - -import cloud.erda.analyzer.common.utils.NumberParser; - -public class NavigationTiming { - public long loadTime = 0; - public long readyStart = 0; - public long domReadyTime = 0; - public long scriptExecuteTime = 0; - public long requestTime = 0; - public long responseTime = 0; - public long initDomTreeTime = 0; - public long loadEventTime = 0; - public long unloadEventTime = 0; - public long appCacheTime = 0; - public long connectTime = 0; - public long lookupDomainTime = 0; - public long redirectTime = 0; - - public static NavigationTiming parse(String nt) { - NavigationTiming result = new NavigationTiming(); - if(nt==null || "".equals(nt)) return result; - String[] times = nt.split("\\,"); - long[] ts = new long[13]; - ts[0]= NumberParser.parseLong(times[0],0, 36); - for(int i = 1; i < 13 && i < times.length; i++) { - ts[i] = NumberParser.parseLong(times[i],0, 36); - if(ts[i] > ts[0]) { - ts[i] = 0; - } - } - result.loadTime = ts[0]; - result.readyStart = ts[1]; - result.domReadyTime = ts[2]; - result.scriptExecuteTime = ts[3]; - result.requestTime = ts[4]; - result.responseTime = ts[5]; - result.initDomTreeTime = ts[6]; - result.loadEventTime = ts[7]; - result.unloadEventTime = ts[8]; - result.appCacheTime = ts[9]; - result.connectTime = ts[10]; - result.lookupDomainTime = ts[11]; - result.redirectTime = ts[12]; - return result; - } -} diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/PathUtils.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/PathUtils.java deleted file mode 100644 index af6a995..0000000 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/PathUtils.java +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (c) 2021 Terminus, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cloud.erda.analyzer.common.schemas.requests; - -public class PathUtils { - - public static String getRoute(String url) { - return removeNumber(getPath(url)); - } - - // "/" -> / - // "/abc" -> /abc - // "/abc/123.0" -> /abc/{number} - // "/abc/cef/" -> /abc/cef/ - // "/123" -> /{number} - // "/123/456" -> /{number}/{number} - // "/123/123/" -> /{number}/{number}/ - private static String removeNumber(String path) { - String[] parts = path.split("\\/",-1); - StringBuilder sb = new StringBuilder(); - for(int i=0, last = parts.length -1; i < parts.length; i++) { - try { - Double.parseDouble(parts[i]); - sb.append("{number}"); - } catch (NumberFormatException e) { - sb.append(parts[i]); - } - if(i != last) { - sb.append("/"); - } - } - return sb.toString(); - } - - // "http://host/path1/path2?query1=value1&query2=value2" -> /path1/path2 - // "//host/path1/path2?query1=value1&query2=value2" -> /path1/path2 - // "/path1/path2?query1=value1&query2=value2" -> /path1/path2 - // "//?query1=value1&query2=value2" -> / - // "//host?query1=value1&query2=value2" -> / - // "path1/path2?query1=value1&query2=value2" -> /path1/path2 - public static String getPath(String url) { - if (url == null || "".equals(url)) return ""; - int start = url.indexOf("//"); - if (start < 0) { - start = 0; - } else { - url = url.substring(start + 2); - start = url.indexOf("/"); - if (start < 0) { - return "/"; - } - } - int end = url.indexOf("?"); - if (end < 0) { - end = url.length(); - } - url = url.substring(start, end); - if (url.length() <= 0) { - return "/"; - } else if (!url.startsWith("/")) { - return "/" + url; - } - return url; - } -} diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/PerformanceTiming.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/PerformanceTiming.java deleted file mode 100644 index 2aa896b..0000000 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/PerformanceTiming.java +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright (c) 2021 Terminus, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cloud.erda.analyzer.common.schemas.requests; - -import cloud.erda.analyzer.common.utils.NumberParser; - -public class PerformanceTiming { - public long firstPaintTime = 0; - public long connectEnd = 0; - public long connectStart = 0; - public long domComplete = 0; - public long domContentLoadedEventEnd = 0; - public long domContentLoadedEventStart = 0; - public long domInteractive = 0; - public long domLoading = 0; - public long domainLookupEnd = 0; - public long domainLookupStart = 0; - public long fetchStart = 0; - public long loadEventEnd = 0; - public long loadEventStart = 0; - public long navigationStart = 0; - public long redirectEnd = 0; - public long redirectStart = 0; - public long requestStart = 0; - public long responseEnd = 0; - public long responseStart = 0; - public long secureConnectionStart = 0; - public long unloadEventEnd = 0; - public long unloadEventStart = 0; - - public static PerformanceTiming parse(String nt) { - PerformanceTiming result = new PerformanceTiming(); - if(nt==null || "".equals(nt)) return result; - String[] times = nt.split("\\,"); - long[] ts = new long[22]; - for(int i = 0; i < 22 && i < times.length; i++) { - ts[i] = NumberParser.parseLong(times[i],0, 36); - } - result.firstPaintTime = ts[0]; - if(result.firstPaintTime < 0) result.firstPaintTime = 0; - result.connectEnd = ts[1]; - result.connectStart = ts[2]; - result.domComplete = ts[3]; - result.domContentLoadedEventEnd = ts[4]; - result.domContentLoadedEventStart = ts[5]; - result.domInteractive = ts[6]; - result.domLoading = ts[7]; - result.domainLookupEnd = ts[8]; - result.domainLookupStart = ts[9]; - result.fetchStart = ts[10]; - result.loadEventEnd = ts[11]; - result.loadEventStart = ts[12]; - result.navigationStart = ts[13]; - result.redirectEnd = ts[14]; - result.redirectStart = ts[15]; - result.requestStart = ts[16]; - result.responseEnd = ts[17]; - result.responseStart = ts[18]; - result.secureConnectionStart = ts[19]; - result.unloadEventEnd = ts[20]; - result.unloadEventStart = ts[21]; - return result; - } -} diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/ResourceTiming.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/ResourceTiming.java deleted file mode 100644 index ae43172..0000000 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/ResourceTiming.java +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright (c) 2021 Terminus, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cloud.erda.analyzer.common.schemas.requests; - -import com.google.gson.JsonSyntaxException; -import cloud.erda.analyzer.common.utils.GsonUtil; -import cloud.erda.analyzer.common.utils.NumberParser; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class ResourceTiming { - - private final static Logger logger = LoggerFactory.getLogger(ResourceTiming.class); - - private static Map InitiatorType = new HashMap(); - - static { - InitiatorType.put("0", "other"); - InitiatorType.put("1", "img"); - InitiatorType.put("2", "link"); - InitiatorType.put("3", "script"); - InitiatorType.put("4", "css"); - InitiatorType.put("5", "xmlhttprequest"); - InitiatorType.put("6", "iframe"); - InitiatorType.put("7", "image"); - } - - public String name; - public int initiatorType; - public long startTime; - public long responseEnd; - public long responseStart; - public long requestStart; - public long connectEnd; - public long secureConnectionStart; - public long connectStart; - public long domainLookupEnd; - public long domainLookupStart; - public long redirectEnd; - public long redirectStart; - - public static List parseToList(String rt) throws JsonSyntaxException { - List list = new ArrayList(); - if(rt == null || "".equals(rt)) return list; - Map map = GsonUtil.toObject(rt, Map.class); - Map resMap = new HashMap(); - decodeResource(resMap, map, ""); - for(Map.Entry entry : resMap.entrySet()) { - String val = entry.getValue(); - if(val == null || "".equals(val) || val.length() < 2) { - continue; - } - String typeKey = val.substring(0, 1); - if(InitiatorType.get(typeKey) == null) { - continue; - } - String timing = val.substring(1); - long[] times = resTimingDecode(timing); - ResourceTiming item = new ResourceTiming(); - item.name = entry.getKey(); - item.initiatorType = NumberParser.parseInt(typeKey,0); - item.startTime = times[0]; - item.responseEnd = times[1]; - item.responseStart = times[2]; - item.requestStart = times[3]; - item.connectEnd = times[4]; - item.secureConnectionStart = times[5]; - item.connectStart = times[6]; - item.domainLookupEnd = times[7]; - item.domainLookupStart = times[8]; - item.redirectEnd = times[9]; - item.redirectStart = times[10]; - list.add(item); - } - return list; - } - - private static void decodeResource(Map output, Map input, String preKey) { - for(Map.Entry entry: input.entrySet()) { - if(Map.class.isAssignableFrom(entry.getValue().getClass())) { - decodeResource(output, (Map)entry.getValue(),preKey+entry.getKey()); - } else { - output.put(preKey+entry.getKey(), entry.getValue().toString()); - } - } - } - - private static long[] resTimingDecode(String timing) { - long[] times = new long[11]; - String[] parts = timing.split("\\,"); - times[0] = NumberParser.parseLong(parts[0],0, 36); - for(int i = 1; i < 11 && i < parts.length; i++) { - times[i] = times[0] + NumberParser.parseLong(parts[i],0, 36); - } - return times; - } - - public static long resourceTiming(List resTiming) { - if(resTiming == null || resTiming.size() <= 0) return 0; - long firstStart = Long.MAX_VALUE; - long lastEnd = Long.MIN_VALUE; - for(ResourceTiming res : resTiming) { - if(res.startTime < firstStart) { - firstStart = res.startTime; - } - if(res.responseEnd > lastEnd) { - lastEnd = res.responseEnd; - } - } - long rlt = lastEnd - firstStart; - if(rlt < 0) { - logger.error(String.format("Resource loading time must be positive, instead got: {}, restTiming: {}", rlt, resTiming)); - return 0; - } - return rlt; - } - - public static long resourceDnsCount(List resTiming) { - if(resTiming == null || resTiming.size() <= 0) return 0; - long count = 0; - for(ResourceTiming res : resTiming) { - if((res.domainLookupEnd - res.domainLookupStart) > 0) { - count++; - } - } - return count; - } - -} \ No newline at end of file diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/UserAgent.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/UserAgent.java deleted file mode 100644 index 1a9135d..0000000 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/schemas/requests/UserAgent.java +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright (c) 2021 Terminus, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cloud.erda.analyzer.common.schemas.requests; - -import com.decibel.uasparser.OnlineUpdater; -import com.decibel.uasparser.UASparser; -import com.decibel.uasparser.UserAgentInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.IOException; - -public class UserAgent { - private final static Logger logger = LoggerFactory.getLogger(UserAgent.class); - - private static UASparser parser = null; - static { - try { - parser = new UASparser(OnlineUpdater.getVendoredInputStream()); - } catch (Exception e) { - logger.error("init UASparser fail.", e); - } - } - - public final static String OTHER = "其他"; - - public String browser = OTHER; - public String browserVersion = ""; - public String os = OTHER; - public String osVersion = ""; - public String device = OTHER; - - public static UserAgent parse(String ua) { - UserAgent result = new UserAgent(); - if(ua == null || "".equals(ua)) { - return result; - } - UserAgentInfo info = null; - try { - info = parser.parse(ua); - } catch (IOException e) { - return result; - } - if(info == null) return result; - String val = info.getOsName(); - if(val != null && !"unknown".equals(val)) { - result.os = val; - } - val = info.getUaFamily(); - if(val != null && !"unknown".equals(val)) { - result.browser = val; - } - val = info.getBrowserVersionInfo(); - if(val != null && !"unknown".equals(val)) { - result.browserVersion = val; - } - val = info.getDeviceType(); - if(val != null && !"unknown".equals(val)) { - result.device = val; - } - return result; - } -} diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/sinks/TerminusCassandraSink.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/sinks/TerminusCassandraSink.java index 44f31f8..8c2ab1f 100644 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/sinks/TerminusCassandraSink.java +++ b/analyzer-common/src/main/java/cloud/erda/analyzer/common/sinks/TerminusCassandraSink.java @@ -18,11 +18,11 @@ import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; -import com.google.common.base.Throwables; import cloud.erda.analyzer.common.utils.CassandraBaseCommitter; import cloud.erda.analyzer.common.utils.CassandraSinkHelper; import cloud.erda.analyzer.common.utils.CassandraSinkUtils; import cloud.erda.analyzer.common.utils.StringUtil; +import com.google.common.base.Throwables; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/CassandraSinkUtils.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/CassandraSinkUtils.java index e513aed..36421a9 100644 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/CassandraSinkUtils.java +++ b/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/CassandraSinkUtils.java @@ -21,8 +21,8 @@ import com.datastax.driver.core.Session; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; -import com.google.common.collect.Maps; import cloud.erda.analyzer.common.utils.annotations.TableConfig; +import com.google.common.collect.Maps; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.cassandra.shaded.com.google.common.reflect.TypeToken; import org.apache.flink.streaming.api.datastream.DataStream; diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/GsonUtil.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/GsonUtil.java deleted file mode 100644 index 38f0ca1..0000000 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/GsonUtil.java +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) 2021 Terminus, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cloud.erda.analyzer.common.utils; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.reflect.TypeToken; - -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Map; - -public class GsonUtil { - private final static Gson gson = new GsonBuilder().serializeSpecialFloatingPointValues().create(); - - public static T toObject(String value, Class type) { - return gson.fromJson(value, type); - } - - public static ArrayList toArrayList(String value, Class type) { - return gson.fromJson(value,TypeToken.getParameterized(ArrayList.class,type).getType()); - - } - - public static Map toMap(String json, Class keyType, Class valueType) { - return gson.fromJson(json, TypeToken.getParameterized(Map.class, keyType, valueType).getType()); - } - - public static String toJson(Object value) { - return gson.toJson(value); - } - - public static byte[] toJSONBytes(Object value) { - return gson.toJson(value).getBytes(Charset.forName("UTF-8")); - } -} diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/Joiners.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/Joiners.java deleted file mode 100644 index 536b442..0000000 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/Joiners.java +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright (c) 2021 Terminus, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cloud.erda.analyzer.common.utils; - -import com.google.common.base.Joiner; - -/** - * Desc: join工具类 - * Mail: houly@terminus.io - * Data: 14:22 2018/12/5 - * Author: houly - */ -public class Joiners { - public static final Joiner DOT = Joiner.on(".").skipNulls(); - public static final Joiner COMMA = Joiner.on(",").skipNulls(); - public static final Joiner COLON = Joiner.on(":").skipNulls(); - - private Joiners() { - } -} diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/JsonMapperUtils.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/JsonMapperUtils.java new file mode 100644 index 0000000..a2eca32 --- /dev/null +++ b/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/JsonMapperUtils.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2021 Terminus, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.erda.analyzer.common.utils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.CollectionType; +import com.fasterxml.jackson.databind.type.MapType; + +import java.io.IOException; +import java.util.*; + +/** + * @author liuhaoyang + * @date 2021/12/29 12:01 + */ +public class JsonMapperUtils { + + private static final ThreadLocal CACHED_OBJECTMAPPER = ThreadLocal.withInitial(ObjectMapper::new); + + private static ObjectMapper getCachedObjectMapper() { + return CACHED_OBJECTMAPPER.get(); + } + + private static final MapType OBJECT_VALUE_MAP_TYPE = constructMapType(HashMap.class, String.class, Object.class); + private static final MapType STRING_VALUE_MAP_TYPE = constructMapType(HashMap.class, String.class, String.class); + + public static T toObject(byte[] data, Class type) throws IOException { + return getCachedObjectMapper().readValue(data, type); + } + + public static T toObject(String data, Class type) throws IOException { + return getCachedObjectMapper().readValue(data, type); + } + + public static Map toObjectValueMap(byte[] data) throws IOException { + return getCachedObjectMapper().readValue(data, OBJECT_VALUE_MAP_TYPE); + } + + public static Map toObjectValueMap(String data) throws IOException { + return getCachedObjectMapper().readValue(data, OBJECT_VALUE_MAP_TYPE); + } + + public static Map toStringValueMap(byte[] data) throws IOException { + return getCachedObjectMapper().readValue(data, STRING_VALUE_MAP_TYPE); + } + + public static Map toStringValueMap(String data) throws IOException { + return getCachedObjectMapper().readValue(data, STRING_VALUE_MAP_TYPE); + } + + + public static Map toHashMap(byte[] data, Class keyClazz, Class valueClazz) throws IOException { + return getCachedObjectMapper().readValue(data, constructMapType(HashMap.class, keyClazz, valueClazz)); + } + + public static Map toHashMap(String data, Class keyClazz, Class valueClazz) throws IOException { + return getCachedObjectMapper().readValue(data, constructMapType(HashMap.class, keyClazz, valueClazz)); + } + + public static byte[] toBytes(T value) throws IOException { + return getCachedObjectMapper().writeValueAsBytes(value); + } + + public static String toStrings(T value) throws IOException { + return getCachedObjectMapper().writeValueAsString(value); + } + + public static , T> L toList(byte[] data, Class listClass, Class valueClazz) throws IOException { + return getCachedObjectMapper().readValue(data, constructCollectionType(listClass, valueClazz)); + } + + public static , T> L toList(String data, Class listClass, Class valueClazz) throws IOException { + return getCachedObjectMapper().readValue(data, constructCollectionType(listClass, valueClazz)); + } + + public static ArrayList toArrayList(byte[] data, Class valueClazz) throws IOException { + return getCachedObjectMapper().readValue(data, constructCollectionType(ArrayList.class, valueClazz)); + } + + public static ArrayList toArrayList(String data, Class valueClazz) throws IOException { + return getCachedObjectMapper().readValue(data, constructCollectionType(ArrayList.class, valueClazz)); + } + + private static MapType constructMapType(Class mapClass, Class keyClass, Class valueClass) { + return getCachedObjectMapper().getTypeFactory().constructMapType(mapClass, keyClass, valueClass); + } + + public static CollectionType constructCollectionType(Class parametrized, Class parameterClasses) { + return getCachedObjectMapper().getTypeFactory().constructCollectionType(parametrized, parameterClasses); + } +} diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/MapUtils.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/MapUtils.java index 81dc1d7..ec336d2 100644 --- a/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/MapUtils.java +++ b/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/MapUtils.java @@ -24,7 +24,7 @@ */ public class MapUtils { - public static TValue getByAnyKey(Map map, TKey... keys) { + public static TValue getByAnyKeys(Map map, TKey... keys) { for (TKey key : keys) { TValue value = map.get(key); if (value != null) { @@ -34,7 +34,7 @@ public static TValue getByAnyKey(Map map, TKey... k return null; } - public static boolean containsAnyKey(Map map, TKey... keys) { + public static boolean containsAnyKeys(Map map, TKey... keys) { for (TKey key : keys) { if (map.containsKey(key)) { return true; @@ -42,4 +42,13 @@ public static boolean containsAnyKey(Map map, TKey. } return false; } + + public static boolean containsAllKeys(Map map, TKey... keys) { + for (TKey key : keys) { + if (!map.containsKey(key)) { + return false; + } + } + return true; + } } diff --git a/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/StringBuilderUtils.java b/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/StringBuilderUtils.java new file mode 100644 index 0000000..0498186 --- /dev/null +++ b/analyzer-common/src/main/java/cloud/erda/analyzer/common/utils/StringBuilderUtils.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2021 Terminus, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.erda.analyzer.common.utils; + +/** + * @author liuhaoyang + * @date 2021/12/28 16:20 + */ +public class StringBuilderUtils { + + private static final int DEFAULT_STRING_BUILDER_SIZE = 1024; + private static final ThreadLocal CACHED_STRINGBUILDERS = + ThreadLocal.withInitial(() -> new StringBuilder(DEFAULT_STRING_BUILDER_SIZE)); + + public static StringBuilder getCachedStringBuilder() { + StringBuilder sb = CACHED_STRINGBUILDERS.get(); + sb.setLength(0); + return sb; + } +} diff --git a/analyzer-error-insight/src/main/java/cloud/erda/analyzer/errorInsight/functions/ErrorEvent2ErdaEventMapper.java b/analyzer-error-insight/src/main/java/cloud/erda/analyzer/errorInsight/functions/ErrorEvent2ErdaEventMapper.java index ef8ced6..b25d95d 100644 --- a/analyzer-error-insight/src/main/java/cloud/erda/analyzer/errorInsight/functions/ErrorEvent2ErdaEventMapper.java +++ b/analyzer-error-insight/src/main/java/cloud/erda/analyzer/errorInsight/functions/ErrorEvent2ErdaEventMapper.java @@ -5,10 +5,9 @@ import cloud.erda.analyzer.common.constant.ExceptionConstants; import cloud.erda.analyzer.common.models.*; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import cloud.erda.analyzer.errorInsight.model.ErrorEvent; import lombok.var; -import net.minidev.json.JSONArray; -import net.minidev.json.JSONObject; import org.apache.flink.api.common.functions.MapFunction; import java.util.HashMap; @@ -28,7 +27,7 @@ public Event map(ErrorEvent value) throws Exception { erdaEvent.setKind(EventKind.EVENT_KIND_EXCEPTION); erdaEvent.setTimeUnixNano(value.getTimestamp()); erdaEvent.setName(EventNameConstants.EXCEPTION); - erdaEvent.setMessage(JSONArray.toJSONString(value.getStacks())); + erdaEvent.setMessage(JsonMapperUtils.toStrings(value.getStacks())); Relation relation = new Relation(); relation.setResID(value.getErrorId()); @@ -38,10 +37,10 @@ public Event map(ErrorEvent value) throws Exception { HashMap attributes = new HashMap<>(); attributes.put(ExceptionConstants.REQUEST_ID, value.getRequestId()); attributes.put(ExceptionConstants.TERMINUS_KEY, value.getTags().getOrDefault("terminus_key", "defaultKey")); - attributes.put(ExceptionConstants.META_DATA, JSONObject.toJSONString(value.getMetaData())); - attributes.put(ExceptionConstants.TAGS, JSONObject.toJSONString(value.getTags())); - attributes.put(ExceptionConstants.REQUEST_CONTEXT, JSONObject.toJSONString(value.getRequestContext())); - attributes.put(ExceptionConstants.REQUEST_HEADERS, JSONObject.toJSONString(value.getRequestHeaders())); + attributes.put(ExceptionConstants.META_DATA, JsonMapperUtils.toStrings(value.getMetaData())); + attributes.put(ExceptionConstants.TAGS, JsonMapperUtils.toStrings(value.getTags())); + attributes.put(ExceptionConstants.REQUEST_CONTEXT, JsonMapperUtils.toStrings(value.getRequestContext())); + attributes.put(ExceptionConstants.REQUEST_HEADERS, JsonMapperUtils.toStrings(value.getRequestHeaders())); erdaEvent.setAttributes(attributes); return erdaEvent; diff --git a/analyzer-metrics/src/main/java/cloud/erda/analyzer/metrics/Main.java b/analyzer-metrics/src/main/java/cloud/erda/analyzer/metrics/Main.java index 5a38f71..211e21e 100644 --- a/analyzer-metrics/src/main/java/cloud/erda/analyzer/metrics/Main.java +++ b/analyzer-metrics/src/main/java/cloud/erda/analyzer/metrics/Main.java @@ -14,23 +14,26 @@ package cloud.erda.analyzer.metrics; -import cloud.erda.analyzer.common.schemas.MetricEventSerializeFunction; -import cloud.erda.analyzer.common.schemas.StringMetricEventSchema; -import cloud.erda.analyzer.metrics.functions.*; -import cloud.erda.analyzer.runtime.MetricRuntime; -import cloud.erda.analyzer.runtime.functions.*; -import cloud.erda.analyzer.runtime.models.*; -import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer; import cloud.erda.analyzer.common.constant.Constants; import cloud.erda.analyzer.common.functions.MetricEventCorrectFunction; import cloud.erda.analyzer.common.models.MetricEvent; import cloud.erda.analyzer.common.schemas.MetricEventSchema; import cloud.erda.analyzer.common.utils.ExecutionEnv; import cloud.erda.analyzer.common.watermarks.MetricWatermarkExtractor; +import cloud.erda.analyzer.metrics.functions.*; +import cloud.erda.analyzer.runtime.MetricRuntime; +import cloud.erda.analyzer.runtime.functions.MetricAlertSelectFunction; +import cloud.erda.analyzer.runtime.functions.MetricEventSelectFunction; +import cloud.erda.analyzer.runtime.functions.MetricSelectOutputProcessFunction; +import cloud.erda.analyzer.runtime.models.AggregatedMetricEvent; +import cloud.erda.analyzer.runtime.models.Expression; +import cloud.erda.analyzer.runtime.models.ExpressionFunction; +import cloud.erda.analyzer.runtime.models.ExpressionMetadata; import cloud.erda.analyzer.runtime.sources.AlertExpressionMetadataReader; -import cloud.erda.analyzer.runtime.sources.MetricExpressionMetadataReader; import cloud.erda.analyzer.runtime.sources.FlinkMysqlAppendSource; +import cloud.erda.analyzer.runtime.sources.MetricExpressionMetadataReader; import cloud.erda.analyzer.runtime.utils.OutputTagUtils; +import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; @@ -126,12 +129,12 @@ public static void main(String[] args) throws Exception { .flatMap(new MetricEventSelectFunction()) .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OPERATOR)) .name("Map metric output to metricEvent") - .flatMap(new MetricEventSerializeFunction()) - .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)) +// .flatMap(new MetricEventSerializeFunction()) +// .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)) .addSink(new FlinkKafkaProducer<>( parameterTool.getRequired(Constants.KAFKA_BROKERS), parameterTool.getRequired(Constants.TOPIC_METRICS), - new StringMetricEventSchema())) + new MetricEventSchema())) .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)) .name("Push metric output to kafka"); @@ -140,12 +143,12 @@ public static void main(String[] args) throws Exception { .flatMap(new MetricEventSelectFunction()) .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OPERATOR)) .name("Map metric temp output to metricEvent") - .flatMap(new MetricEventSerializeFunction()) - .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)) +// .flatMap(new MetricEventSerializeFunction()) +// .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)) .addSink(new FlinkKafkaProducer<>( parameterTool.getRequired(Constants.KAFKA_BROKERS), parameterTool.getRequired(Constants.TOPIC_METRICS_TEMP), - new StringMetricEventSchema())) + new MetricEventSchema())) .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)) .name("Push metric temp output to kafka"); @@ -155,12 +158,12 @@ public static void main(String[] args) throws Exception { .process(new MetricAlertSelectFunction()) .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OPERATOR)) .name("Process alert recover_status and map metric to alert.") - .flatMap(new MetricEventSerializeFunction()) - .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)) +// .flatMap(new MetricEventSerializeFunction()) +// .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)) .addSink(new FlinkKafkaProducer<>( parameterTool.getRequired(Constants.KAFKA_BROKERS), parameterTool.getRequired(Constants.TOPIC_ALERT), - new StringMetricEventSchema())) + new MetricEventSchema())) .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)) .name("Push alert output to kafka"); diff --git a/analyzer-metrics/src/main/java/cloud/erda/analyzer/metrics/sources/elasticsearch/FlinkElasticsearchSource.java b/analyzer-metrics/src/main/java/cloud/erda/analyzer/metrics/sources/elasticsearch/FlinkElasticsearchSource.java deleted file mode 100644 index 5d928f5..0000000 --- a/analyzer-metrics/src/main/java/cloud/erda/analyzer/metrics/sources/elasticsearch/FlinkElasticsearchSource.java +++ /dev/null @@ -1,195 +0,0 @@ -// Copyright (c) 2021 Terminus, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cloud.erda.analyzer.metrics.sources.elasticsearch; - -import com.google.gson.Gson; -import cloud.erda.analyzer.common.constant.Constants; -import cloud.erda.analyzer.common.models.MetricEvent; -import lombok.extern.slf4j.Slf4j; -import lombok.val; -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.IndicesOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.builder.SearchSourceBuilder; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.concurrent.TimeUnit; - -/** - * { "_index": "spot-machine_summary-full_cluster", "_id": "{{cluster_name}}/{{host_ip}}/{{terminus_version}}" - * "_source": { "name": "machine_summary", "timestamp": 1587957030000000000, "@timestamp": 1587957030000, "tags": { - * "name": "machine_summary", "timestamp": 1587957030000000000, "@timestamp": 1587957030000, "tags": { "cluster_name": - * "terminus-dev", "host_ip": "10.168.0.100", "terminus_index_id": "terminus-dev/10.168.0.100/2", "terminus_version": - * "2" }, "fields": { "labels": [ "org-terminus", "offline" ] } } } } - */ - -/** - * @author randomnil - */ -@Slf4j -public class FlinkElasticsearchSource extends AbstractRichFunction implements SourceFunction { - - private final static long DEFAULT_INTERVAL = 60000; - - private ParameterTool parameterTool; - private HttpHost[] httpHosts; - private String index; - private String query; - private final long interval; - private RestHighLevelClient client; - - private final static Gson GSON = new Gson(); - - private volatile boolean isRunning = true; - - public FlinkElasticsearchSource(long interval, String index, String query, ParameterTool parameterTool) { - String esUrl = parameterTool.get(Constants.ES_URL); - String[] esUrls = esUrl.split(","); - this.httpHosts = new HttpHost[esUrls.length]; - for (int i = 0; i < esUrls.length; i++) { - URL url; - try { - url = new URL(esUrls[i]); - } catch (MalformedURLException e) { - String msg = String.format("url %s is not correct", esUrl); - log.error(msg, e); - throw new RuntimeException(msg); - } - - HttpHost httpHost = new HttpHost(url.getHost(), url.getPort()); - this.httpHosts[i] = httpHost; - } - - if (interval <= 0) { - interval = DEFAULT_INTERVAL; - } - this.interval = interval; - this.index = index; - this.query = query; - this.parameterTool = parameterTool; - } - - @Override - public void run(SourceContext ctx) throws Exception { - val searchRequest = this.buildSearchRequest(); - - while (isRunning) { - if (this.client == null) { - this.initESClient(); - } - if (this.client == null) { - TimeUnit.MILLISECONDS.sleep(this.interval); - continue; - } - - this.searchRequest(ctx, searchRequest); - TimeUnit.MILLISECONDS.sleep(this.interval); - } - } - - @Override - public void cancel() { - this.isRunning = false; - if (this.client != null) { - try { - this.client.close(); - } catch (IOException e) { - log.error("Close es client error", e); - } - } - } - - private SearchRequest buildSearchRequest() { - val queryBuilder = QueryBuilders.wrapperQuery(this.query); - val searchBuilder = new SearchSourceBuilder().query(queryBuilder); - return new SearchRequest() - .indicesOptions(IndicesOptions.fromOptions( - true, - true, - SearchRequest.DEFAULT_INDICES_OPTIONS.expandWildcardsOpen(), - SearchRequest.DEFAULT_INDICES_OPTIONS.expandWildcardsClosed(), - SearchRequest.DEFAULT_INDICES_OPTIONS - )) - .indices(this.index) - .source(searchBuilder); - } - - private void initESClient() { - val clientBuilder = RestClient.builder(httpHosts); - RestClientFactory restClientFactory = restClientBuilder -> { - restClientBuilder - .setMaxRetryTimeoutMillis(parameterTool.getInt(Constants.ES_MAXRETRY_TIMEOUT, 5 * 60 * 1000)) - .setRequestConfigCallback(builder -> { - builder.setConnectTimeout(parameterTool.getInt(Constants.ES_REQUEST_CONNECT_TIMEOUT, 5000)); - builder.setSocketTimeout(parameterTool.getInt(Constants.ES_REQUEST_SOCKET_TIMEOUT, 4000)); - builder.setConnectionRequestTimeout( - parameterTool.getInt(Constants.ES_REQUEST_CONN_REQUEST_TIMEOUT, 1000)); - return builder; - }); - if (parameterTool.getBoolean(Constants.ES_SECURITY_ENABLE, false)) { - val credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials( - parameterTool.get(Constants.ES_SECURITY_USERNAME), - parameterTool.get(Constants.ES_SECURITY_PASSWORD))); - restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder - .setDefaultCredentialsProvider(credentialsProvider)); - } - }; - restClientFactory.configureRestClientBuilder(clientBuilder); - - val rhlClient = new RestHighLevelClient(clientBuilder); - try { - if (!rhlClient.ping()) { - log.error("There are no reachable Elasticsearch nodes!"); - } else { - this.client = rhlClient; - } - } catch (Exception e) { - log.error("es client ping error.", e); - } - } - - private void searchRequest(SourceContext ctx, SearchRequest searchRequest) { - SearchResponse searchResponse; - try { - searchResponse = this.client.search(searchRequest); - } catch (Exception e) { - log.error("es client search {} error.", searchRequest.toString(), e); - return; - } - - if (searchResponse == null || searchResponse.getHits() == null) { - return; - } - for (val hit : searchResponse.getHits().getHits()) { - val metric = GSON.fromJson(hit.getSourceAsString(), MetricEvent.class); - ctx.collect(metric); - } - } -} diff --git a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/functions/MetricAlertSelectFunction.java b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/functions/MetricAlertSelectFunction.java index 4c6a93d..7a1ca11 100644 --- a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/functions/MetricAlertSelectFunction.java +++ b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/functions/MetricAlertSelectFunction.java @@ -15,7 +15,7 @@ package cloud.erda.analyzer.runtime.functions; import cloud.erda.analyzer.common.models.MetricEvent; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import cloud.erda.analyzer.runtime.models.AggregatedMetricEvent; import cloud.erda.analyzer.runtime.models.OutputMetricEvent; import cloud.erda.analyzer.common.constant.MetricTagConstants; @@ -77,10 +77,12 @@ public void processElement(AggregatedMetricEvent value, Context context, Collect if (filter(value)) { MetricEvent alert = mapAggregatedMetricEvent(value); try { - log.info("Collect alert event --> {}", GsonUtil.toJson(alert)); + if (log.isInfoEnabled()) { + log.info("Collect alert event --> {}", JsonMapperUtils.toStrings(value)); + } out.collect(alert); } catch (Throwable throwable) { - log.error("Cannot collect alertEvent from {} , tags {}", value.getMetric().getName(), GsonUtil.toJson(alert.getTags()), throwable); + log.error("Cannot collect alertEvent from {} , tags {}", value.getMetric().getName(), JsonMapperUtils.toStrings(alert.getTags()), throwable); throw throwable; } } diff --git a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/models/ExpressionFunction.java b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/models/ExpressionFunction.java index 2d83301..af61baf 100644 --- a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/models/ExpressionFunction.java +++ b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/models/ExpressionFunction.java @@ -14,8 +14,8 @@ package cloud.erda.analyzer.runtime.models; -import com.google.gson.annotations.SerializedName; import lombok.Data; +import com.fasterxml.jackson.annotation.JsonSetter; /** * @author: liuhaoyang @@ -30,14 +30,14 @@ public class ExpressionFunction { private String field; - @SerializedName(value = "field_script") + @JsonSetter("field_script") private String fieldScript; private String operator; private Object value; - @SerializedName(value = "value_script") + @JsonSetter("value_script") private String valueScript; private ExpressionFunctionTrigger trigger; diff --git a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/sources/ExpressionMetadataReader.java b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/sources/ExpressionMetadataReader.java index 9064f30..88b04f9 100644 --- a/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/sources/ExpressionMetadataReader.java +++ b/analyzer-runtime/src/main/java/cloud/erda/analyzer/runtime/sources/ExpressionMetadataReader.java @@ -17,13 +17,13 @@ package cloud.erda.analyzer.runtime.sources; import cloud.erda.analyzer.common.constant.ExpressionConstants; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import cloud.erda.analyzer.runtime.expression.filters.FilterOperatorDefine; import cloud.erda.analyzer.runtime.models.*; -import cloud.erda.analyzer.runtime.sources.DataRowReader; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; @@ -41,7 +41,7 @@ public abstract class ExpressionMetadataReader implements DataRowReader { @Override - public ExpressionMetadata read(ResultSet resultSet) throws SQLException { + public ExpressionMetadata read(ResultSet resultSet) throws SQLException, IOException { try { ExpressionMetadata metadata = new ExpressionMetadata(); @@ -193,9 +193,9 @@ private void checkExpression(ExpressionMetadata metadata) { checkNotNull(expression.getAlias(), "Expression alias cannot be null"); } - private Map parseJsonField(String value, String field, String matedataId, Class keyClass, Class valueClass) { + private Map parseJsonField(String value, String field, String matedataId, Class keyClass, Class valueClass) throws IOException { try { - Map map = GsonUtil.toMap(value, keyClass, valueClass); + Map map = JsonMapperUtils.toHashMap(value, keyClass, valueClass); return map; } catch (Throwable throwable) { log.warn("Parse json field fail. metadata {}, field {}\n {}", matedataId, field, value); @@ -203,9 +203,9 @@ private Map parseJsonField(String value, String field, String mated } } - private V parseJsonField(String value, String field, String matedataId, Class valueClass) { + private V parseJsonField(String value, String field, String matedataId, Class valueClass) throws IOException { try { - V ins = GsonUtil.toObject(value, valueClass); + V ins = JsonMapperUtils.toObject(value, valueClass); return ins; } catch (Throwable throwable) { log.warn("Parse json field fail. metadata {}, field {}\n {}", matedataId, field, value); diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/Main.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/Main.java index be28cec..7446cc6 100644 --- a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/Main.java +++ b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/Main.java @@ -18,10 +18,7 @@ import cloud.erda.analyzer.common.functions.MetricEventCorrectFunction; import cloud.erda.analyzer.common.models.MetricEvent; import cloud.erda.analyzer.common.schemas.MetricEventSchema; -import cloud.erda.analyzer.common.schemas.MetricEventSerializeFunction; -import cloud.erda.analyzer.common.schemas.StringMetricEventSchema; import cloud.erda.analyzer.common.utils.ExecutionEnv; -import cloud.erda.analyzer.common.utils.StringUtil; import cloud.erda.analyzer.common.watermarks.BoundedOutOfOrdernessWatermarkGenerator; import cloud.erda.analyzer.common.watermarks.MetricWatermarkExtractor; import cloud.erda.analyzer.runtime.MetricRuntime; @@ -44,7 +41,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; -import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; @@ -81,32 +77,20 @@ public static void main(String[] args) throws Exception { .setParallelism(parameterTool.getInt(STREAM_PARALLELISM_INPUT)) .name("filter span not null") .assignTimestampsAndWatermarks(WatermarkStrategy - .forGenerator(new BoundedOutOfOrdernessWatermarkGenerator(Duration.ofSeconds(10))) + .forGenerator(new BoundedOutOfOrdernessWatermarkGenerator(Duration.ofSeconds(1))) .withTimestampAssigner(new SpanTimestampAssigner())) .name("span consumer watermark") .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_INPUT)); - DataStream serviceStream = spanStream - .filter(new SpanServiceTagCheckFunction()) - .name("check whether the tag of the service exists") - .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OPERATOR)) - .keyBy(new SpanServiceGroupFunction()) - .window(TumblingEventTimeWindows.of(Time.seconds(10))) - .reduce(new SpanServiceReduceFunction()) - .name("reduce span service") - .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OPERATOR)) - .map(new SpanToServiceMetricFunction()) - .name("map reduced span to service metric") - .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OPERATOR)); - - DataStream tranMetricStream = spanStream + // trace analysis + DataStream apmMetricsStream = spanStream .keyBy(Span::getTraceID) .window(EventTimeSessionWindows.withDynamicGap(new TraceAnalysisTimeGapExtractor())) // .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process(new TraceAnalysisFunction()) .name("trace analysis windows process") .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OPERATOR)) - .flatMap(new SlowOrErrorMetricFunction(parameterTool)) + .process(new ServiceAndSlowErrorMetricFunction(parameterTool)) .name("slow or error metric process") .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OPERATOR)) .keyBy(new MetricTagGroupFunction()) @@ -115,18 +99,11 @@ public static void main(String[] args) throws Exception { .name("Aggregate metrics field process") .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OPERATOR)); - DataStream tracingMetrics = serviceStream.union(tranMetricStream) - .flatMap(new MetricMetaFunction()) - .name("add metric meta") - .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)); - - tracingMetrics - .flatMap(new MetricEventSerializeFunction()) - .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)) + apmMetricsStream .addSink(new FlinkKafkaProducer<>( parameterTool.getRequired(Constants.KAFKA_BROKERS), - parameterTool.getRequired(Constants.TOPIC_TRACING_METRICS), - new StringMetricEventSchema())) + parameterTool.getRequired(Constants.TOPIC_METRICS), + new MetricEventSchema())) .name("send trace metrics to kafka") .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)); @@ -151,12 +128,11 @@ public static void main(String[] args) throws Exception { .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_INPUT)) .name("spot span consumer watermark"); - DataStream spanMetrics = spanStream.map(new SpanMetricCompatibleFunction()) + DataStream oapSpan = spanStream.map(new SpanMetricCompatibleFunction()) .name("map oap-span to spot span") .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OPERATOR)); - DataStream aggregationMetricEvent = MetricRuntime.run(spanMetrics.union(spotSpan).union(tracingMetrics), metricExpressionQuery, parameterTool); - + DataStream aggregationMetricEvent = MetricRuntime.run(oapSpan.union(spotSpan), metricExpressionQuery, parameterTool); SingleOutputStreamOperator outputMetrics = aggregationMetricEvent .process(new MetricSelectOutputProcessFunction(OutputTagUtils.OutputMetricTag, null, null)) @@ -168,12 +144,12 @@ public static void main(String[] args) throws Exception { .flatMap(new MetricEventSelectFunction()) .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OPERATOR)) .name("Map metric output to metricEvent") - .flatMap(new MetricEventSerializeFunction()) - .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)) +// .flatMap(new MetricEventSerializeFunction()) +// .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)) .addSink(new FlinkKafkaProducer<>( parameterTool.getRequired(Constants.KAFKA_BROKERS), - parameterTool.getRequired(Constants.TOPIC_METRICS), - new StringMetricEventSchema())) + parameterTool.getRequired(Constants.TOPIC_TRACING_METRICS), + new MetricEventSchema())) .setParallelism(parameterTool.getInt(Constants.STREAM_PARALLELISM_OUTPUT)) .name("Push metric output to kafka"); diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/MetricFieldAggregateFunction.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/MetricFieldAggregateFunction.java index 92c5292..dd607e0 100644 --- a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/MetricFieldAggregateFunction.java +++ b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/MetricFieldAggregateFunction.java @@ -17,9 +17,11 @@ package cloud.erda.analyzer.tracing.functions; import cloud.erda.analyzer.common.models.MetricEvent; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.AggregateFunction; +import java.io.IOException; import java.io.Serializable; import java.util.Date; import java.util.Map; @@ -42,6 +44,7 @@ public StatsAccumulator add(MetricEvent metricEvent, StatsAccumulator statsAccum return statsAccumulator; } + @SneakyThrows @Override public MetricEvent getResult(StatsAccumulator statsAccumulator) { return statsAccumulator.getResult(); diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/MetricFieldProcessFunction.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/MetricFieldProcessFunction.java index 85847c3..2377d4b 100644 --- a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/MetricFieldProcessFunction.java +++ b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/MetricFieldProcessFunction.java @@ -17,7 +17,7 @@ package cloud.erda.analyzer.tracing.functions; import cloud.erda.analyzer.common.models.MetricEvent; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -39,7 +39,7 @@ public void process(String s, ProcessWindowFunction { +public class MetricMetaFunction implements MapFunction { + @Override - public void flatMap(MetricEvent metricEvent, Collector collector) throws Exception { - if (metricEvent == null) { - return; - } + public MetricEvent map(MetricEvent metricEvent) throws Exception { metricEvent.addTag(SpanConstants.META, SpanConstants.TRUE); metricEvent.addTag(SpanConstants.METRIC_SCOPE, SpanConstants.METRIC_SCOPE_MICRO_SERVICE); - metricEvent.addTag(SpanConstants.METRIC_SCOPE_ID, MapUtils.getByAnyKey(metricEvent.getTags(), SpanConstants.ENV_ID, SpanConstants.TERMINUS_KEY)); - collector.collect(metricEvent); + metricEvent.addTag(SpanConstants.METRIC_SCOPE_ID, MapUtils.getByAnyKeys(metricEvent.getTags(), SpanConstants.ENV_ID, SpanConstants.TERMINUS_KEY)); + return metricEvent; } } diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/MetricTagGroupFunction.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/MetricTagGroupFunction.java index e3ffcd2..b1c12ec 100644 --- a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/MetricTagGroupFunction.java +++ b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/MetricTagGroupFunction.java @@ -16,7 +16,10 @@ package cloud.erda.analyzer.tracing.functions; +import cloud.erda.analyzer.common.constant.SpanConstants; import cloud.erda.analyzer.common.models.MetricEvent; +import cloud.erda.analyzer.common.utils.MapUtils; +import cloud.erda.analyzer.common.utils.StringBuilderUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.digest.DigestUtils; import org.apache.flink.api.java.functions.KeySelector; @@ -33,17 +36,40 @@ public class MetricTagGroupFunction implements KeySelector { @Override public String getKey(MetricEvent metricEvent) throws Exception { - StringBuilder sb = new StringBuilder(); - sb.append(metricEvent.getName()); - Map sortedTags = new TreeMap<>(metricEvent.getTags()); - for (Map.Entry tag : sortedTags.entrySet()) { - sb.append("_").append(tag.getKey()).append("_").append(tag.getValue()); + StringBuilder sb = StringBuilderUtils.getCachedStringBuilder(); + sb.append(metricEvent.getName()).append("_"); + + sb.append(metricEvent.getTags().get(SpanConstants.ENV_ID)).append("_"); + sb.append(metricEvent.getTags().get(SpanConstants.SERVICE_ID)).append("_"); + sb.append(metricEvent.getTags().get(SpanConstants.SERVICE_INSTANCE_ID)).append("_"); + + String spanLayer = metricEvent.getTags().getOrDefault(SpanConstants.SPAN_LAYER, SpanConstants.SPAN_LAYER_UNKNOWN); + sb.append(spanLayer).append("_"); + + switch (spanLayer) { + case SpanConstants.SPAN_LAYER_HTTP: + sb.append(MapUtils.getByAnyKeys(metricEvent.getTags(), SpanConstants.TAG_HTTP_PATH, SpanConstants.TAG_HTTP_TARGET, SpanConstants.TAG_HTTP_URL)); + break; + case SpanConstants.SPAN_LAYER_RPC: + sb.append(MapUtils.getByAnyKeys(metricEvent.getTags(), SpanConstants.TAG_RPC_TARGET)); + break; + case SpanConstants.APPLICATION_CACHE: + case SpanConstants.SPAN_LAYER_DB: + sb.append(MapUtils.getByAnyKeys(metricEvent.getTags(), SpanConstants.DB_SYSTEM, SpanConstants.DB_TYPE)).append(MapUtils.getByAnyKeys(metricEvent.getTags(), SpanConstants.DB_STATEMENT)); + break; + case SpanConstants.SPAN_LAYER_MQ: + sb.append(MapUtils.getByAnyKeys(metricEvent.getTags(), SpanConstants.MESSAGE_BUS_DESTINATION)); + break; + default: + sb.append(metricEvent.getTags().get(SpanConstants.OPERATION_NAME)); + break; } + String series = sb.toString(); - String md5HexKey = DigestUtils.md5Hex(series); +// String md5HexKey = DigestUtils.md5Hex(series); if (log.isDebugEnabled()) { - log.debug("metric series = {} . md5HexKey = {}", series, md5HexKey); + log.debug("FieldAggregator group metric series = {} md5HexKey = {}", series, ""); } - return md5HexKey; + return series; } } diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/ServiceAndSlowErrorMetricFunction.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/ServiceAndSlowErrorMetricFunction.java new file mode 100644 index 0000000..aa06f01 --- /dev/null +++ b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/ServiceAndSlowErrorMetricFunction.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2021 Terminus, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cloud.erda.analyzer.tracing.functions; + +import cloud.erda.analyzer.common.constant.SpanConstants; +import cloud.erda.analyzer.common.models.MetricEvent; +import cloud.erda.analyzer.common.utils.MapUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import scala.concurrent.java8.FuturesConvertersImpl; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author liuhaoyang + * @date 2021/9/23 10:20 + */ +public class ServiceAndSlowErrorMetricFunction extends ProcessFunction { + + private static final Map parmaKeys; + + static { + parmaKeys = new HashMap<>(); + parmaKeys.put(SpanConstants.APPLICATION_HTTP, "application.slow.http"); + parmaKeys.put(SpanConstants.APPLICATION_RPC, "application.slow.rpc"); + parmaKeys.put(SpanConstants.APPLICATION_CACHE, "application.slow.cache"); + parmaKeys.put(SpanConstants.APPLICATION_DB, "application.slow.db"); + parmaKeys.put("default", "application.slow.default"); + } + + private final ParameterTool parameterTool; + + public ServiceAndSlowErrorMetricFunction(ParameterTool parameterTool) { + this.parameterTool = parameterTool; + } + + @Override + public void processElement(MetricEvent metricEvent, ProcessFunction.Context context, Collector collector) throws Exception { + + if (parmaKeys.containsKey(metricEvent.getName())) { + long slowDefault = parameterTool.getLong(parmaKeys.get("default")); + long metricSlow = parameterTool.getLong(parmaKeys.get(metricEvent.getName()), slowDefault); + + if ((long) metricEvent.getFields().get(SpanConstants.ELAPSED) > metricSlow) { + MetricEvent slowMetric = metricEvent.copy(); + slowMetric.setName(metricEvent.getName() + "_slow"); + collector.collect(slowMetric); + } + + String tagError = metricEvent.getTags().get(SpanConstants.ERROR); + if (StringUtils.isNotEmpty(tagError) && tagError.equalsIgnoreCase(SpanConstants.TRUE)) { + MetricEvent errorMetric = metricEvent.copy(); + errorMetric.setName(metricEvent.getName() + "_error"); + collector.collect(errorMetric); + } + metricEvent.getTags().remove(SpanConstants.TRACE_ID); + metricEvent.getTags().remove(SpanConstants.REQUEST_ID); + metricEvent.getTags().remove(SpanConstants.TRACE_SAMPLED); + } + metricEvent.addTag(SpanConstants.META, SpanConstants.TRUE); + metricEvent.addTag(SpanConstants.METRIC_SCOPE, SpanConstants.METRIC_SCOPE_MICRO_SERVICE); + metricEvent.addTag(SpanConstants.METRIC_SCOPE_ID, MapUtils.getByAnyKeys(metricEvent.getTags(), SpanConstants.ENV_ID, SpanConstants.TERMINUS_KEY)); + collector.collect(metricEvent); + } +} diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SlowOrErrorMetricFunction.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SlowOrErrorMetricFunction.java deleted file mode 100644 index e0e5cc0..0000000 --- a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SlowOrErrorMetricFunction.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (c) 2021 Terminus, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cloud.erda.analyzer.tracing.functions; - -import cloud.erda.analyzer.common.constant.SpanConstants; -import cloud.erda.analyzer.common.models.MetricEvent; -import cloud.erda.analyzer.common.utils.StringUtil; -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.util.Collector; - -import java.util.HashMap; -import java.util.Map; - -/** - * @author liuhaoyang - * @date 2021/9/23 10:20 - */ -public class SlowOrErrorMetricFunction implements FlatMapFunction { - - private static final Map parmaKeys; - - static { - parmaKeys = new HashMap<>(); - parmaKeys.put(SpanConstants.APPLICATION_HTTP, "application.slow.http"); - parmaKeys.put(SpanConstants.APPLICATION_RPC, "application.slow.rpc"); - parmaKeys.put(SpanConstants.APPLICATION_CACHE, "application.slow.cache"); - parmaKeys.put(SpanConstants.APPLICATION_DB, "application.slow.db"); - parmaKeys.put("default", "application.slow.default"); - } - - private final ParameterTool parameterTool; - - public SlowOrErrorMetricFunction(ParameterTool parameterTool) { - this.parameterTool = parameterTool; - } - - @Override - public void flatMap(MetricEvent metricEvent, Collector collector) throws Exception { - - long slowDefault = parameterTool.getLong(parmaKeys.get("default")); - long metricSlow = slowDefault; - - if (parmaKeys.containsKey(metricEvent.getName())) { - metricSlow = parameterTool.getLong(parmaKeys.get(metricEvent.getName()), slowDefault); - } - - if ((long) metricEvent.getFields().get(SpanConstants.ELAPSED) > metricSlow) { - MetricEvent slowMetric = metricEvent.copy(); - slowMetric.setName(metricEvent.getName() + "_slow"); - collector.collect(slowMetric); - } - - String tagError = metricEvent.getTags().get(SpanConstants.ERROR); - if (StringUtils.isNotEmpty(tagError) && tagError.equalsIgnoreCase(SpanConstants.TRUE)) { - MetricEvent errorMetric = metricEvent.copy(); - errorMetric.setName(metricEvent.getName() + "_error"); - collector.collect(errorMetric); - } - - metricEvent.getTags().remove(SpanConstants.TRACE_ID); - metricEvent.getTags().remove(SpanConstants.REQUEST_ID); - metricEvent.getTags().remove(SpanConstants.TRACE_SAMPLED); - collector.collect(metricEvent); - } -} diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanCorrectFunction.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanCorrectFunction.java index 8d57a0a..7cd622e 100644 --- a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanCorrectFunction.java +++ b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanCorrectFunction.java @@ -44,20 +44,20 @@ private String getSpanLayer(Span span) { return span.getAttributes().get(SpanConstants.SPAN_LAYER); } - if (MapUtils.containsAnyKey(span.getAttributes(), SpanConstants.TAG_HTTP_PATH, SpanConstants.TAG_HTTP_URL, SpanConstants.TAG_HTTP_TARGET)) { + if (MapUtils.containsAnyKeys(span.getAttributes(), SpanConstants.TAG_HTTP_PATH, SpanConstants.TAG_HTTP_URL, SpanConstants.TAG_HTTP_TARGET)) { return SpanConstants.SPAN_LAYER_HTTP; } - if (MapUtils.containsAnyKey(span.getAttributes(), SpanConstants.TAG_RPC_TARGET, SpanConstants.TAG_RPC_SERVICE, SpanConstants.TAG_RPC_METHOD, SpanConstants.TAG_DUBBO_SERVICE, SpanConstants.TAG_DUBBO_METHOD)) { + if (MapUtils.containsAnyKeys(span.getAttributes(), SpanConstants.TAG_RPC_TARGET, SpanConstants.TAG_RPC_SERVICE, SpanConstants.TAG_RPC_METHOD, SpanConstants.TAG_DUBBO_SERVICE, SpanConstants.TAG_DUBBO_METHOD)) { return SpanConstants.SPAN_LAYER_RPC; } - if (MapUtils.containsAnyKey(span.getAttributes(), SpanConstants.MESSAGE_BUS_DESTINATION)) { + if (MapUtils.containsAnyKeys(span.getAttributes(), SpanConstants.MESSAGE_BUS_DESTINATION)) { return SpanConstants.SPAN_LAYER_MQ; } - if (MapUtils.containsAnyKey(span.getAttributes(), SpanConstants.DB_STATEMENT)) { - String dbType = MapUtils.getByAnyKey(span.getAttributes(), SpanConstants.DB_SYSTEM, SpanConstants.DB_TYPE); + if (MapUtils.containsAnyKeys(span.getAttributes(), SpanConstants.DB_STATEMENT)) { + String dbType = MapUtils.getByAnyKeys(span.getAttributes(), SpanConstants.DB_SYSTEM, SpanConstants.DB_TYPE); if (dbType != null) { if (SpanConstants.DB_TYPE_REDIS.equalsIgnoreCase(dbType)) { return SpanConstants.SPAN_LAYER_CACHE; diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanSchema.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanSchema.java index e263ea1..5fe5207 100644 --- a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanSchema.java +++ b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanSchema.java @@ -16,8 +16,8 @@ package cloud.erda.analyzer.tracing.functions; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import cloud.erda.analyzer.tracing.model.Span; -import com.google.gson.Gson; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.slf4j.Logger; @@ -32,12 +32,11 @@ public class SpanSchema implements DeserializationSchema { private final static Logger logger = LoggerFactory.getLogger(SpanSchema.class); - private final static Gson gson = new Gson(); @Override public Span deserialize(byte[] bytes) throws IOException { try { - Span span = gson.fromJson(new String(bytes), Span.class); + Span span = JsonMapperUtils.toObject(bytes, Span.class); return span; } catch (Exception e) { logger.error("Deserialize SpanEvent fail ", e); diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanServiceGroupFunction.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanServiceGroupFunction.java index 6b72b05..e2ef0d8 100644 --- a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanServiceGroupFunction.java +++ b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanServiceGroupFunction.java @@ -17,9 +17,12 @@ package cloud.erda.analyzer.tracing.functions; import cloud.erda.analyzer.common.constant.SpanConstants; +import cloud.erda.analyzer.common.utils.StringBuilderUtils; import cloud.erda.analyzer.tracing.model.Span; import org.apache.flink.api.java.functions.KeySelector; +import java.util.Map; + /** * @author liuhaoyang * @date 2021/9/21 01:00 @@ -28,10 +31,11 @@ public class SpanServiceGroupFunction implements KeySelector { @Override public String getKey(Span span) throws Exception { - return span.getAttributes().get(SpanConstants.ENV_ID) - + ":" - + span.getAttributes().get(SpanConstants.SERVICE_ID) - + ":" - + span.getAttributes().get(SpanConstants.SERVICE_INSTANCE_ID); + Map attributes = span.getAttributes(); + StringBuilder sb = StringBuilderUtils.getCachedStringBuilder(); + sb.append(attributes.get(SpanConstants.ENV_ID)); + sb.append(attributes.get(SpanConstants.SERVICE_ID)); + sb.append(attributes.get(SpanConstants.SERVICE_INSTANCE_ID)); + return sb.toString(); } } diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanServiceTagCheckFunction.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanServiceTagCheckFunction.java index 75b938e..525373e 100644 --- a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanServiceTagCheckFunction.java +++ b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanServiceTagCheckFunction.java @@ -17,6 +17,7 @@ package cloud.erda.analyzer.tracing.functions; import cloud.erda.analyzer.common.constant.SpanConstants; +import cloud.erda.analyzer.common.utils.MapUtils; import cloud.erda.analyzer.tracing.model.Span; import org.apache.flink.api.common.functions.FilterFunction; @@ -30,9 +31,6 @@ public class SpanServiceTagCheckFunction implements FilterFunction { @Override public boolean filter(Span span) throws Exception { - Map attributes = span.getAttributes(); - return attributes.containsKey(SpanConstants.ENV_ID) - && attributes.containsKey(SpanConstants.SERVICE_ID) - && attributes.containsKey(SpanConstants.SERVICE_NAME); + return MapUtils.containsAllKeys(span.getAttributes(), SpanConstants.ENV_ID, SpanConstants.SERVICE_ID, SpanConstants.SERVICE_NAME); } } diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanToServiceMetricFunction.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanToServiceMetricFunction.java index e0c2044..c8e4ea3 100644 --- a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanToServiceMetricFunction.java +++ b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/SpanToServiceMetricFunction.java @@ -18,7 +18,7 @@ import cloud.erda.analyzer.common.constant.SpanConstants; import cloud.erda.analyzer.common.models.MetricEvent; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import cloud.erda.analyzer.tracing.model.Span; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.MapFunction; @@ -45,19 +45,10 @@ public MetricEvent map(Span span) throws Exception { metricEvent.addTag(SpanConstants.SERVICE_INSTANCE_IP, span.getAttributes().get(SpanConstants.SERVICE_INSTANCE_IP)); metricEvent.addTag(SpanConstants.PROJECT_NAME, span.getAttributes().get(SpanConstants.PROJECT_NAME)); metricEvent.addTag(SpanConstants.WORKSPACE, span.getAttributes().get(SpanConstants.WORKSPACE)); - if (span.getAttributes().containsKey(SpanConstants.JAEGER_VERSION)) { - metricEvent.addTag(SpanConstants.INSTRUMENT_SDK, SpanConstants.JAEGER); - metricEvent.addTag(SpanConstants.INSTRUMENT_SDK_VERSION, span.getAttributes().get(SpanConstants.JAEGER_VERSION)); - } int startTimeCount = 0; - Long startAt = getServiceInstanceStartedAt(span.getAttributes().get(SpanConstants.SERVICE_INSTANCE_STARTED_AT)); - if (startAt != null) { - startTimeCount++; - metricEvent.addField(SpanConstants.START_TIME_MEAN, startAt); - } metricEvent.addField(SpanConstants.START_TIME_COUNT, startTimeCount); if (log.isDebugEnabled()) { - log.debug("Map reduced span to service metric @SpanEndTime {}. {} ", new Date(metricEvent.getTimestamp() / 1000000), GsonUtil.toJson(metricEvent)); + log.debug("Map reduced span to service metric @SpanEndTime {}. {} ", new Date(metricEvent.getTimestamp() / 1000000), JsonMapperUtils.toStrings(metricEvent)); } return metricEvent; } diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/StatsAccumulator.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/StatsAccumulator.java index ffe89e3..03c2c07 100644 --- a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/StatsAccumulator.java +++ b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/StatsAccumulator.java @@ -17,12 +17,10 @@ package cloud.erda.analyzer.tracing.functions; import cloud.erda.analyzer.common.models.MetricEvent; -import cloud.erda.analyzer.common.utils.ConvertUtils; -import cloud.erda.analyzer.common.utils.GsonUtil; -import lombok.Data; -import lombok.Getter; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import lombok.extern.slf4j.Slf4j; +import java.io.IOException; import java.io.Serializable; import java.util.Date; import java.util.HashMap; @@ -43,14 +41,16 @@ public void apply(MetricEvent metricEvent) { if (metricEvent == null) { return; } - lastMetric = metricEvent; + if (lastMetric == null || metricEvent.getTimestamp() > lastMetric.getTimestamp()) { + lastMetric = metricEvent; + } for (Map.Entry entry : metricEvent.getFields().entrySet()) { FieldAggregator aggregator = aggregators.computeIfAbsent(entry.getKey(), FieldAggregator::new); aggregator.apply(entry.getValue()); } } - public MetricEvent getResult() { + public MetricEvent getResult() throws IOException { if (lastMetric == null) { return null; } @@ -65,7 +65,7 @@ public MetricEvent getResult() { metricEvent.addField(aggregator.getName() + "_max", aggregator.getMax()); } if (log.isDebugEnabled()) { - log.debug("transaction metric aggregate @SpanEndTime {} . {}", new Date(metricEvent.getTimestamp() / 1000000), GsonUtil.toJson(metricEvent)); + log.debug("apm metric aggregate @SpanEndTime {} {}", new Date(metricEvent.getTimestamp() / 1000000), JsonMapperUtils.toStrings(metricEvent)); } return metricEvent; } diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/TraceAnalysisFunction.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/TraceAnalysisFunction.java index 192c96f..15bc2ec 100644 --- a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/TraceAnalysisFunction.java +++ b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/TraceAnalysisFunction.java @@ -18,7 +18,7 @@ import cloud.erda.analyzer.common.constant.SpanConstants; import cloud.erda.analyzer.common.models.MetricEvent; -import cloud.erda.analyzer.common.utils.GsonUtil; +import cloud.erda.analyzer.common.utils.JsonMapperUtils; import cloud.erda.analyzer.common.utils.StringUtil; import cloud.erda.analyzer.tracing.model.Span; import lombok.extern.slf4j.Slf4j; @@ -26,9 +26,7 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; +import java.util.*; /** * @author liuhaoyang @@ -40,11 +38,33 @@ public class TraceAnalysisFunction extends ProcessWindowFunction.Context context, Iterable iterable, Collector collector) throws Exception { Map spans = new HashMap<>(); + Map services = new HashMap<>(); + Set parentIds = new HashSet<>(); for (Span span : iterable) { spans.put(span.getSpanID(), span); + if (StringUtil.isNotEmpty(span.getParentSpanID())) { + parentIds.add(span.getParentSpanID()); + } + String serviceKey = getAttribute(span, SpanConstants.ENV_ID) + getAttribute(span, SpanConstants.TERMINUS_KEY) + getAttribute(span, SpanConstants.SERVICE_ID) + getAttribute(span, SpanConstants.SERVICE_INSTANCE_ID); + Span lastServiceSpan = services.get(serviceKey); + if (lastServiceSpan == null || span.getEndTimeUnixNano() > lastServiceSpan.getEndTimeUnixNano()) { + services.put(serviceKey, span); + } } - for (Map.Entry entry : spans.entrySet()) { - Span span = entry.getValue(); + + // process service node + for (Span span : services.values()) { + MetricEvent metricEvent = createServiceMetrics(span, SpanConstants.APPLICATION_SERVICE_NODE); + int startTimeCount = 0; + metricEvent.addField(SpanConstants.START_TIME, startTimeCount); + if (log.isDebugEnabled()) { + log.debug("Map span to service metric @SpanEndTime {}. {} ", new Date(metricEvent.getTimestamp() / 1000000), JsonMapperUtils.toStrings(metricEvent)); + } + collector.collect(metricEvent); + } + + // process http&rpc&db&cache&mq call metrics + for (Span span : iterable) { String layer = getAttribute(span, SpanConstants.SPAN_LAYER); String kind = getAttribute(span, SpanConstants.SPAN_KIND); @@ -56,7 +76,7 @@ public void process(String s, ProcessWindowFunction traceSpans, Span span, return metricEvent; } - private MetricEvent createClientMetrics(Span span, String metricName) { + private MetricEvent createClientMetrics(Set parentIds, Span span, String metricName) { + // If the client span has child spans, this span will is ignored + if (parentIds.contains(span.getSpanID())) { + return null; + } MetricEvent metricEvent = new MetricEvent(); metricEvent.setName(metricName); metricEvent.addTag(SpanConstants.SOURCE_SERVICE_ID, getAttribute(span, SpanConstants.SERVICE_ID)); @@ -135,6 +159,21 @@ private MetricEvent createClientMetrics(Span span, String metricName) { return metricEvent; } + private MetricEvent createServiceMetrics(Span span, String metricName) { + MetricEvent metricEvent = new MetricEvent(); + metricEvent.setTimestamp(span.getEndTimeUnixNano()); + metricEvent.setName(metricName); + metricEvent.addTag(SpanConstants.ENV_ID, span.getAttributes().get(SpanConstants.ENV_ID)); + metricEvent.addTag(SpanConstants.TERMINUS_KEY, span.getAttributes().get(SpanConstants.ENV_ID)); + metricEvent.addTag(SpanConstants.SERVICE_ID, span.getAttributes().get(SpanConstants.SERVICE_ID)); + metricEvent.addTag(SpanConstants.SERVICE_NAME, span.getAttributes().get(SpanConstants.SERVICE_NAME)); + metricEvent.addTag(SpanConstants.SERVICE_INSTANCE_ID, span.getAttributes().get(SpanConstants.SERVICE_INSTANCE_ID)); + metricEvent.addTag(SpanConstants.SERVICE_INSTANCE_IP, span.getAttributes().get(SpanConstants.SERVICE_INSTANCE_IP)); + metricEvent.addTag(SpanConstants.PROJECT_NAME, span.getAttributes().get(SpanConstants.PROJECT_NAME)); + metricEvent.addTag(SpanConstants.WORKSPACE, span.getAttributes().get(SpanConstants.WORKSPACE)); + return metricEvent; + } + private String getAttribute(Span span, String key) { return span.getAttributes().get(key); } diff --git a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/TraceAnalysisTimeGapExtractor.java b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/TraceAnalysisTimeGapExtractor.java index d634b9c..9a1ec76 100644 --- a/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/TraceAnalysisTimeGapExtractor.java +++ b/analyzer-tracing/src/main/java/cloud/erda/analyzer/tracing/functions/TraceAnalysisTimeGapExtractor.java @@ -30,7 +30,7 @@ public class TraceAnalysisTimeGapExtractor implements SessionWindowTimeGapExtrac private static final Long ROOT_SPAN_GAP = Time.seconds(3).toMilliseconds(); - private static final Long CHILD_SPAN_GAP = Time.seconds(15).toMilliseconds(); + private static final Long CHILD_SPAN_GAP = Time.seconds(10).toMilliseconds(); @Override public long extract(Span span) { diff --git a/pom.xml b/pom.xml index b88f334..8d6cd31 100644 --- a/pom.xml +++ b/pom.xml @@ -134,11 +134,6 @@ lombok 1.18.4 - - com.google.code.gson - gson - 2.8.5 - org.apache.flink flink-connector-elasticsearch6_${scala.binary.version}