Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,7 @@ public void process(WatchedEvent event) {

MetricsInfo metricsInfo = getContext().getMetricsInfo();
List<MetricsProducer> producers = new ArrayList<>();
producers.add(this);
producers.add(balanceManager.getMetrics());

final TabletGroupWatcher userTableTGW =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.accumulo.test.functional;

import static org.apache.accumulo.core.metrics.Metric.LOW_MEMORY;
import static org.apache.accumulo.core.metrics.Metric.SERVER_IDLE;
import static org.apache.accumulo.core.metrics.MetricsInfo.PROCESS_NAME_TAG_KEY;
import static org.apache.accumulo.core.metrics.MetricsInfo.RESOURCE_GROUP_TAG_KEY;
Expand Down Expand Up @@ -61,15 +62,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IdleProcessMetricsIT extends SharedMiniClusterBase {
public class ProcessMetricsIT extends SharedMiniClusterBase {

private static final Logger log = LoggerFactory.getLogger(IdleProcessMetricsIT.class);
private static final Logger log = LoggerFactory.getLogger(ProcessMetricsIT.class);

static final Duration idleProcessInterval = Duration.ofSeconds(10);

public static final String IDLE_RESOURCE_GROUP = "IDLE_PROCESS_TEST";

public static class IdleStopITConfig implements MiniClusterConfigurationCallback {
public static class ProcessMetricsITConfig implements MiniClusterConfigurationCallback {

@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) {
Expand Down Expand Up @@ -128,7 +129,7 @@ protected Duration defaultTimeout() {
@BeforeAll
public static void before() throws Exception {
sink = new TestStatsDSink();
SharedMiniClusterBase.startMiniClusterWithConfig(new IdleStopITConfig());
SharedMiniClusterBase.startMiniClusterWithConfig(new ProcessMetricsITConfig());
}

@AfterAll
Expand All @@ -142,7 +143,7 @@ public static void after() throws Exception {
* tserver.
*/
@Test
public void testIdleStopMetrics() throws Exception {
public void testIdleMetrics() throws Exception {

// should emit the idle metric after the configured duration of GENERAL_IDLE_PROCESS_INTERVAL
Thread.sleep(idleProcessInterval.toMillis());
Expand Down Expand Up @@ -174,6 +175,42 @@ public void testIdleStopMetrics() throws Exception {
});
}

/**
* Test that the low mem process metrics are emitted by all processes
*/
@Test
public void testLowMemMetrics() throws Exception {

// can't test Monitor, Mini doesn't start it
AtomicBoolean sawManager = new AtomicBoolean(false);
AtomicBoolean sawGC = new AtomicBoolean(false);
AtomicBoolean sawCompactor = new AtomicBoolean(false);
AtomicBoolean sawSServer = new AtomicBoolean(false);
AtomicBoolean sawTServer = new AtomicBoolean(false);

Wait.waitFor(() -> {
List<String> statsDMetrics = sink.getLines();
statsDMetrics.stream().filter(line -> line.startsWith(LOW_MEMORY.getName())).peek(log::info)
.map(TestStatsDSink::parseStatsDMetric).forEach(a -> {
String processName = a.getTags().get(PROCESS_NAME_TAG_KEY);
if (ServerId.Type.TABLET_SERVER.name().equals(processName)) {
sawTServer.set(true);
} else if (ServerId.Type.SCAN_SERVER.name().equals(processName)) {
sawSServer.set(true);
} else if (ServerId.Type.COMPACTOR.name().equals(processName)) {
sawCompactor.set(true);
} else if (ServerId.Type.MANAGER.name().equals(processName)) {
sawManager.set(true);
} else if (ServerId.Type.GARBAGE_COLLECTOR.name().equals(processName)) {
sawGC.set(true);
}

});
return sawManager.get() && sawGC.get() && sawCompactor.get() && sawSServer.get()
&& sawTServer.get();
});
}

/**
* Test that before during and after a compaction, the compactor will emit the appropriate value
* for the idle metric.
Expand Down