Skip to content

[FLINK-36932][metrics] Added resource-level metrics for different states/statuses #926

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.flink.kubernetes.operator.metrics;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.util.StringUtils;

import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.math.NumberUtils;

import java.util.Map;
Expand All @@ -37,8 +39,9 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
private final Configuration configuration;

// map(namespace, map(status, set(deployment))
private final Map<String, Map<JobManagerDeploymentStatus, Set<String>>> deploymentStatuses =
private final Map<String, Map<JobManagerDeploymentStatus, Set<String>>> jmDeploymentStatuses =
new ConcurrentHashMap<>();
private final Map<String, Map<JobStatus, Set<String>>> jobStatuses = new ConcurrentHashMap<>();
// map(namespace, map(version, set(deployment)))
private final Map<String, Map<String, Set<String>>> deploymentFlinkVersions =
new ConcurrentHashMap<>();
Expand All @@ -53,9 +56,11 @@ public class FlinkDeploymentMetrics implements CustomResourceMetrics<FlinkDeploy
public static final String FLINK_MINOR_VERSION_GROUP_NAME = "FlinkMinorVersion";
public static final String UNKNOWN_VERSION = "UNKNOWN";
public static final String MALFORMED_MINOR_VERSION = "MALFORMED";
public static final String STATUS_GROUP_NAME = "JmDeploymentStatus";
public static final String JM_DEPLOYMENT_STATUS_GROUP_NAME = "JmDeploymentStatus";
public static final String JOB_STATUS_GROUP_NAME = "JobStatus";
public static final String RESOURCE_USAGE_GROUP_NAME = "ResourceUsage";
public static final String COUNTER_NAME = "Count";
public static final String IN_STATUS_NAME = "InStatus";
public static final String CPU_NAME = "Cpu";
public static final String MEMORY_NAME = "Memory";

Expand All @@ -65,27 +70,21 @@ public FlinkDeploymentMetrics(
this.configuration = configuration;
}

@Override
public void onUpdate(FlinkDeployment flinkApp) {
onRemove(flinkApp);

var flinkAppState = flinkApp.getStatus();
var namespace = flinkApp.getMetadata().getNamespace();
var clusterInfo = flinkApp.getStatus().getClusterInfo();
var clusterInfo = flinkAppState.getClusterInfo();
var deploymentName = flinkApp.getMetadata().getName();

deploymentStatuses
.computeIfAbsent(
namespace,
ns -> {
initNamespaceDeploymentCounts(ns);
initNamespaceStatusCounts(ns);
return createDeploymentStatusMap();
})
.get(flinkApp.getStatus().getJobManagerDeploymentStatus())
.add(deploymentName);
initJmDeploymentMetrics(namespace, deploymentName, flinkApp);
initJobMetrics(namespace, deploymentName, flinkApp);

// Full runtime version queried from the JobManager REST API
var flinkVersion =
flinkApp.getStatus()
flinkAppState
.getClusterInfo()
.getOrDefault(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, "");
if (StringUtils.isNullOrWhitespaceOnly(flinkVersion)) {
Expand Down Expand Up @@ -146,12 +145,61 @@ public void onUpdate(FlinkDeployment flinkApp) {
AbstractFlinkService.FIELD_NAME_TOTAL_MEMORY, "0")));
}

private void initJmDeploymentMetrics(
String namespace, String deploymentName, FlinkDeployment flinkApp) {
var currentJmDeploymentStatus = flinkApp.getStatus().getJobManagerDeploymentStatus();

boolean deploymentRegistrationOccurred =
jmDeploymentStatuses
.computeIfAbsent(
namespace,
ns -> {
initNamespaceDeploymentCounts(ns);
initNamespaceJmDeploymentStatusCounts(ns);
return createStatusMapFromEnum(
JobManagerDeploymentStatus.class);
})
.get(currentJmDeploymentStatus)
.add(deploymentName);

if (deploymentRegistrationOccurred) {
initJmDeploymentStatusGauges(namespace, deploymentName);
}
}

private void initJobMetrics(String namespace, String deploymentName, FlinkDeployment flinkApp) {
var jobStatusDetails = flinkApp.getStatus().getJobStatus();
var jobStatus = jobStatusDetails.getState();
if (jobStatus == null) {
return;
}

boolean deploymentRegistrationOccurred =
jobStatuses
.computeIfAbsent(
namespace,
ns -> {
initNamespaceJobStatusCounts(ns);
return createStatusMapFromEnum(JobStatus.class);
})
.get(jobStatus)
.add(deploymentName);

if (deploymentRegistrationOccurred) {
initJobStatusGauges(namespace, deploymentName);
}
}

@Override
public void onRemove(FlinkDeployment flinkApp) {
var namespace = flinkApp.getMetadata().getNamespace();
var name = flinkApp.getMetadata().getName();

if (deploymentStatuses.containsKey(namespace)) {
deploymentStatuses.get(namespace).values().forEach(names -> names.remove(name));
if (jmDeploymentStatuses.containsKey(namespace)) {
jmDeploymentStatuses.get(namespace).values().forEach(names -> names.remove(name));
}
if (jobStatuses.containsKey(namespace)) {
jobStatuses.get(namespace).values().forEach(names -> names.remove(name));
}
if (deploymentFlinkVersions.containsKey(namespace)) {
deploymentFlinkVersions.get(namespace).values().forEach(names -> names.remove(name));
Expand All @@ -176,18 +224,60 @@ private void initNamespaceDeploymentCounts(String ns) {
.gauge(
COUNTER_NAME,
() ->
deploymentStatuses.get(ns).values().stream()
jmDeploymentStatuses.get(ns).values().stream()
.mapToInt(Set::size)
.sum());
}

private void initNamespaceStatusCounts(String ns) {
private void initNamespaceJmDeploymentStatusCounts(String ns) {
for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
parentMetricGroup
.createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
.addGroup(STATUS_GROUP_NAME)
.addGroup(JM_DEPLOYMENT_STATUS_GROUP_NAME)
.addGroup(status.toString())
.gauge(COUNTER_NAME, () -> deploymentStatuses.get(ns).get(status).size());
.gauge(COUNTER_NAME, () -> jmDeploymentStatuses.get(ns).get(status).size());
}
}

private void initNamespaceJobStatusCounts(String ns) {
for (JobStatus status : JobStatus.values()) {
parentMetricGroup
.createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
.addGroup(JOB_STATUS_GROUP_NAME)
.addGroup(status.toString())
.gauge(COUNTER_NAME, () -> jobStatuses.get(ns).get(status).size());
}
}

private void initJmDeploymentStatusGauges(String ns, String deploymentName) {
for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
parentMetricGroup
.createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
.createResourceGroup(configuration, deploymentName)
.addGroup(JM_DEPLOYMENT_STATUS_GROUP_NAME)
.addGroup(status.toString())
.gauge(
IN_STATUS_NAME,
() ->
jmDeploymentStatuses
.get(ns)
.get(status)
.contains(deploymentName)
? 1
: 0);
}
}

private void initJobStatusGauges(String ns, String deploymentName) {
for (JobStatus status : JobStatus.values()) {
parentMetricGroup
.createResourceNamespaceGroup(configuration, FlinkDeployment.class, ns)
.createResourceGroup(configuration, deploymentName)
.addGroup(JOB_STATUS_GROUP_NAME)
.addGroup(status.toString())
.gauge(
IN_STATUS_NAME,
() -> jobStatuses.get(ns).get(status).contains(deploymentName) ? 1 : 0);
}
}

Expand Down Expand Up @@ -231,9 +321,10 @@ private void initNamespaceMemoryUsage(String ns) {
.reduce(0L, Long::sum));
}

private Map<JobManagerDeploymentStatus, Set<String>> createDeploymentStatusMap() {
Map<JobManagerDeploymentStatus, Set<String>> statuses = new ConcurrentHashMap<>();
for (JobManagerDeploymentStatus status : JobManagerDeploymentStatus.values()) {
private static <T extends Enum<T>> Map<T, Set<String>> createStatusMapFromEnum(
Class<T> statusType) {
Map<T, Set<String>> statuses = new ConcurrentHashMap<>();
for (T status : statusType.getEnumConstants()) {
statuses.put(status, ConcurrentHashMap.newKeySet());
}
return statuses;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ public class LifecycleMetrics<CR extends AbstractFlinkResource<?, ?>>

public static final List<Transition> TRACKED_TRANSITIONS = getTrackedTransitions();

// map(tuple(namespace, resource), tracker)
private final Map<Tuple2<String, String>, ResourceLifecycleMetricTracker> lifecycleTrackers =
new ConcurrentHashMap<>();
private final Set<String> namespaces = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<String> resourceNames = Collections.newSetFromMap(new ConcurrentHashMap<>());

private final Clock clock;
private final KubernetesOperatorMetricGroup operatorMetricGroup;
Expand Down Expand Up @@ -104,6 +106,7 @@ public void onUpdate(CR cr) {

@Override
public void onRemove(CR cr) {
resourceNames.remove(cr.getMetadata().getName());
lifecycleTrackers.remove(
Tuple2.of(cr.getMetadata().getNamespace(), cr.getMetadata().getName()));
}
Expand All @@ -120,6 +123,8 @@ private ResourceLifecycleMetricTracker getLifecycleMetricTracker(CR cr) {
? Instant.parse(cr.getMetadata().getCreationTimestamp())
: clock.instant();
return new ResourceLifecycleMetricTracker(
cr.getMetadata().getNamespace(),
cr.getMetadata().getName(),
initialState,
time,
getTransitionHistograms(cr),
Expand All @@ -129,32 +134,75 @@ private ResourceLifecycleMetricTracker getLifecycleMetricTracker(CR cr) {

private void createNamespaceStateCountIfMissing(CR cr) {
var namespace = cr.getMetadata().getNamespace();
if (!namespaces.add(namespace)) {
return;

if (namespaces.add(namespace)) {
var resourceNamespaceGroup =
operatorMetricGroup.createResourceNamespaceGroup(
config, cr.getClass(), namespace);
MetricGroup lifecycleNamespaceGroup = metricGroupFunction.apply(resourceNamespaceGroup);

for (ResourceLifecycleState state : ResourceLifecycleState.values()) {
lifecycleNamespaceGroup
.addGroup("State")
.addGroup(state.name())
.gauge(
"Count",
() ->
lifecycleTrackers.values().stream()
.filter(
tracker ->
isMatchingTrackerWithState(
tracker, namespace, state))
.count());
}
}

MetricGroup lifecycleGroup =
metricGroupFunction.apply(
operatorMetricGroup.createResourceNamespaceGroup(
config, cr.getClass(), namespace));
for (ResourceLifecycleState state : ResourceLifecycleState.values()) {
lifecycleGroup
.addGroup("State")
.addGroup(state.name())
.gauge(
"Count",
() ->
lifecycleTrackers.entrySet().stream()
.filter(
tracker ->
tracker.getKey().f0.equals(namespace)
&& tracker.getValue()
.getCurrentState()
== state)
.count());
var resourceName = cr.getMetadata().getName();
if (resourceNames.add(resourceName)) {
var resourceNamespaceGroup =
operatorMetricGroup.createResourceNamespaceGroup(
config, cr.getClass(), namespace);
MetricGroup lifecycleResourceGroup =
metricGroupFunction.apply(
resourceNamespaceGroup.createResourceGroup(config, resourceName));

for (ResourceLifecycleState state : ResourceLifecycleState.values()) {
lifecycleResourceGroup
.addGroup("State")
.addGroup(state.name())
.gauge(
"InState",
() ->
lifecycleTrackers.values().stream()
.anyMatch(
tracker ->
isMatchingTrackerWithState(
tracker,
namespace,
resourceName,
state))
? 1
: 0);
}
}
}

private static boolean isMatchingTrackerWithState(
ResourceLifecycleMetricTracker tracker,
String namespace,
ResourceLifecycleState state) {
return tracker.getNamespace().equals(namespace) && tracker.getCurrentState() == state;
}

private static boolean isMatchingTrackerWithState(
ResourceLifecycleMetricTracker tracker,
String namespace,
String resourceName,
ResourceLifecycleState state) {
return isMatchingTrackerWithState(tracker, namespace, state)
&& tracker.getResourceName().equals(resourceName);
}

private synchronized void init(CR cr) {
if (transitionMetrics != null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.metrics.Histogram;

import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +37,8 @@ public class ResourceLifecycleMetricTracker {

private static final Logger LOG = LoggerFactory.getLogger(ResourceLifecycleMetricTracker.class);

@Getter private final String namespace;
@Getter private final String resourceName;
private final Map<String, List<Histogram>> transitionHistos;
private final Map<ResourceLifecycleState, List<Histogram>> stateTimeHistos;

Expand All @@ -45,10 +48,14 @@ public class ResourceLifecycleMetricTracker {
private ResourceLifecycleState currentState;

public ResourceLifecycleMetricTracker(
String namespace,
String resourceName,
ResourceLifecycleState initialState,
Instant time,
Map<String, List<Histogram>> transitionHistos,
Map<ResourceLifecycleState, List<Histogram>> stateTimeHistos) {
this.namespace = namespace;
this.resourceName = resourceName;
this.transitionHistos = transitionHistos;
this.currentState = initialState;
this.stateTimeHistos = stateTimeHistos;
Expand Down
Loading