Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesImpl;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PropertiesUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -205,6 +206,7 @@ public boolean acceptsURL(String url) throws SQLException {

@Override
public Connection connect(String url, Properties info) throws SQLException {
long connectionCreationStartTime = EnvironmentEdgeManager.currentTimeMillis();
GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER.increment();
if (!acceptsURL(url)) {
GLOBAL_FAILED_PHOENIX_CONNECTIONS.increment();
Expand All @@ -213,7 +215,7 @@ public Connection connect(String url, Properties info) throws SQLException {
lockInterruptibly(LockMode.READ);
try {
checkClosed();
return createConnection(url, info);
return createConnection(url, info, connectionCreationStartTime);
} catch (SQLException sqlException) {
if (sqlException.getErrorCode() != SQLExceptionCode.NEW_CONNECTION_THROTTLED.getErrorCode()) {
GLOBAL_FAILED_PHOENIX_CONNECTIONS.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.phoenix.jdbc;

import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTION_CREATION_DURATION_MS;
import static org.apache.phoenix.query.QueryServices.QUERY_SERVICES_NAME;
import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;

import java.sql.Connection;
Expand All @@ -29,8 +31,10 @@
import java.util.logging.Logger;
import javax.annotation.concurrent.Immutable;
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
import org.apache.phoenix.monitoring.connectionqueryservice.ConnectionQueryServicesMetricsManager;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
Expand Down Expand Up @@ -119,31 +123,50 @@ public boolean acceptsURL(String url) throws SQLException {

@Override
public Connection connect(String url, Properties info) throws SQLException {
long connectionCreationStartTime = EnvironmentEdgeManager.currentTimeMillis();
if (!acceptsURL(url)) {
return null;
}

return createConnection(url, info);
return createConnection(url, info, connectionCreationStartTime);
}

protected final Connection createConnection(String url, Properties info) throws SQLException {
protected final Connection createConnection(String url, Properties info,
long connectionCreationStartTime) throws SQLException {
Properties augmentedInfo = PropertiesUtil.deepCopy(info);
augmentedInfo.putAll(getDefaultProps().asMap());
Connection connection = null;
String queryServiceName = null;
if (url.contains("|")) {
// Get HAURLInfo to pass it to connection creation
HAURLInfo haurlInfo = HighAvailabilityGroup.getUrlInfo(url, augmentedInfo);
// High availability connection using two clusters
Optional<HighAvailabilityGroup> haGroup = HighAvailabilityGroup.get(url, augmentedInfo);
if (haGroup.isPresent()) {
return haGroup.get().connect(augmentedInfo, haurlInfo);
connection = haGroup.get().connect(augmentedInfo, haurlInfo);
queryServiceName = haurlInfo.getPrincipal();
setConnectionCreationDurationMetric(queryServiceName, connectionCreationStartTime,
connection);
return connection;
} else {
// If empty HA group is returned, fall back to single cluster.
url = HighAvailabilityGroup.getFallbackCluster(url, info).orElseThrow(
() -> new SQLException("HA group can not be initialized, fallback to single cluster"));
}
}
ConnectionQueryServices cqs = getConnectionQueryServices(url, augmentedInfo);
return cqs.connect(url, augmentedInfo);
connection = cqs.connect(url, augmentedInfo);
queryServiceName = ((PhoenixConnection) connection).getQueryServices().getConfiguration()
.get(QUERY_SERVICES_NAME);
setConnectionCreationDurationMetric(queryServiceName, connectionCreationStartTime, connection);
return connection;
}

private void setConnectionCreationDurationMetric(String queryServiceName,
long connectionCreationTime, Connection connection) {
ConnectionQueryServicesMetricsManager.updateMetrics(queryServiceName,
PHOENIX_CONNECTION_CREATION_DURATION_MS,
EnvironmentEdgeManager.currentTimeMillis() - connectionCreationTime);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ public enum MetricType {
PHOENIX_CONNECTIONS_FAILED_COUNTER("cf",
"Number of client Phoenix Connections Failed to open" + ", not including throttled connections",
LogLevel.OFF, PLong.INSTANCE),
PHOENIX_CONNECTION_CREATION_DURATION_MS("cct",
"Time spent in creating Phoenix connections in milliseconds", LogLevel.OFF, PLong.INSTANCE),
CLIENT_METADATA_CACHE_MISS_COUNTER("cmcm", "Number of cache misses for the CQSI cache.",
LogLevel.DEBUG, PLong.INSTANCE),
CLIENT_METADATA_CACHE_HIT_COUNTER("cmch", "Number of cache hits for the CQSI cache.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTION_CREATION_DURATION_MS;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -42,7 +43,8 @@ public enum QueryServiceMetrics {
CONNECTION_QUERY_SERVICE_OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER(
OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER),
CONNECTION_QUERY_SERVICE_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(
PHOENIX_CONNECTIONS_THROTTLED_COUNTER);
PHOENIX_CONNECTIONS_THROTTLED_COUNTER),
CONNECTION_QUERY_SERVICE_CONNECTION_CREATION_DURATION(PHOENIX_CONNECTION_CREATION_DURATION_MS);

private MetricType metricType;
private ConnectionQueryServicesMetric metric;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,11 @@ public static void registerMetricsPublisher() {
/**
* This function will be used to add individual MetricType to LocalStore. Also this will serve as
* LocalStore to store connection query service metrics before their current value is added to
* histogram. This func is only used for metrics which are counter based, where values increases
* or decreases frequently. Like Open Conn Counter. This function will first retrieve it's current
* value and increment or decrement (by +/-1) it as required then update the new values. <br>
* Example :- OPEN_PHOENIX_CONNECTIONS_COUNTER, OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER <br>
* histogram. In case of counter where values is passed as 1, this function will first retrieve
* it's current value and increment or decrement (by +/-1) it as required then update the new
* values. <br>
* Example :- OPEN_PHOENIX_CONNECTIONS_COUNTER, OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER,
* PHOENIX_CONNECTION_CREATION_DURATION_MS<br>
* <br>
* histogram will update with each increment/decrement.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ public static Map<String, List<HistogramDistribution>> getAllConnectionQueryServ
}

public static Map<String, List<ConnectionQueryServicesMetric>>
getAllConnectionQueryServicesCounters() {
getAllConnectionQueryServicesMetrics() {
return ConnectionQueryServicesMetricsManager.getAllConnectionQueryServicesMetrics();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@

import static org.apache.hadoop.test.GenericTestUtils.waitFor;
import static org.apache.phoenix.jdbc.HighAvailabilityGroup.PHOENIX_HA_GROUP_ATTR;
import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.PRINCIPAL;
import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair.doTestWhenOneHBaseDown;
import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.doTestBasicOperationsWithConnection;
import static org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.getHighAvailibilityGroup;
import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_ENABLED;
import static org.apache.phoenix.util.PhoenixRuntime.clearAllConnectionQueryServiceMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
Expand All @@ -37,11 +42,18 @@
import java.util.*;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.ClusterRoleRecord.RegistryType;
import org.apache.phoenix.monitoring.ConnectionQueryServicesMetric;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.query.ConfigurationFactory;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.ConnectionQueryServicesImpl;
import org.apache.phoenix.util.InstanceResolver;
import org.apache.phoenix.util.PhoenixRuntime;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -88,6 +100,24 @@ public class HAConnectionWithMasterAndRPCRegistryIT {
public static void setUpBeforeClass() throws Exception {
CLUSTERS.start();
DriverManager.registerDriver(PhoenixDriver.INSTANCE);
InstanceResolver.clearSingletons();
InstanceResolver.getSingleton(ConfigurationFactory.class, new ConfigurationFactory() {
@Override
public Configuration getConfiguration() {
Configuration conf = HBaseConfiguration.create();
conf.set(CONNECTION_QUERY_SERVICE_METRICS_ENABLED, String.valueOf(true));
return conf;
}

@Override
public Configuration getConfiguration(Configuration confToClone) {
Configuration conf = HBaseConfiguration.create();
conf.set(CONNECTION_QUERY_SERVICE_METRICS_ENABLED, String.valueOf(true));
Configuration copy = new Configuration(conf);
copy.addResource(confToClone);
return copy;
}
});
}

@AfterClass
Expand Down Expand Up @@ -566,6 +596,27 @@ public void testAllWrappedConnectionsClosedAfterRegistryChange2() throws Excepti
}
}

@Test
public void testConnectionCreationDuration() throws SQLException {
clearAllConnectionQueryServiceMetrics();
Connection conn = getParallelConnection();
validateConnectionCreationTime(conn);

conn = getFailoverConnection();
validateConnectionCreationTime(conn);
}

private void validateConnectionCreationTime(Connection connection) {
Map<String, List<ConnectionQueryServicesMetric>> metrics =
PhoenixRuntime.getAllConnectionQueryServicesMetrics();
assertNotNull(connection);
for (ConnectionQueryServicesMetric metric : metrics.get(PRINCIPAL)) {
if (metric.getMetricType().equals(MetricType.PHOENIX_CONNECTION_CREATION_DURATION_MS)) {
assertNotEquals(0, metric.getValue());
}
}
}

/**
* Create a failover connection using {@link #failoverClientProperties}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTIONS_THROTTLED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.PHOENIX_CONNECTION_CREATION_DURATION_MS;
import static org.apache.phoenix.query.QueryServices.CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS;
import static org.apache.phoenix.query.QueryServices.CONNECTION_QUERY_SERVICE_METRICS_ENABLED;
import static org.apache.phoenix.query.QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS;
Expand Down Expand Up @@ -240,7 +241,7 @@ private void checkConnectionQueryServiceMetricsValues(String queryServiceName) t
stmt.execute(String.format(CREATE_TABLE_DDL, tableName + "_" + connQueryServiceName));
if (connQueryServiceName.equals(CONN_QUERY_SERVICE_CHECK_CONN_THROTTLE)) {
try (Connection conn1 = DriverManager.getConnection(princURL)) {
assertMetricValues(connQueryServiceName, 2, 0, 0);
assertMetricValues(connQueryServiceName, 2, 0, 0, 0);
assertHistogramMetricsForMutations(connQueryServiceName, 2, 0, 0, 0);
try (Connection conn2 = DriverManager.getConnection(princURL)) {
// This should never execute in this test.
Expand All @@ -253,7 +254,8 @@ private void checkConnectionQueryServiceMetricsValues(String queryServiceName) t
// Open Connection Count : 1
// Open Internal Connection Count : 0
// Connection Throttled Count : 0
assertMetricValues(connQueryServiceName, 1, 0, 0);
// Connection creation duration > 0
assertMetricValues(connQueryServiceName, 1, 0, 0, 0);
assertHistogramMetricsForMutations(connQueryServiceName, 1, 0, 0, 0);
}
} catch (Exception e) {
Expand All @@ -265,7 +267,8 @@ private void checkConnectionQueryServiceMetricsValues(String queryServiceName) t
// Open Connection Count : 0
// Connection Throttled Count : 1
// Open Internal Connection Count : 0
assertMetricValues(queryServiceName, 0, 1, 0);
// Connection creation duration > -1 i.e 0
assertMetricValues(queryServiceName, 0, 1, 0, -1);
// In histogram, we will still have max open connection count as 2
// while rest of the values will be 0.
assertHistogramMetricsForMutations(queryServiceName, 2, 0, 0, 0);
Expand All @@ -274,7 +277,8 @@ private void checkConnectionQueryServiceMetricsValues(String queryServiceName) t
// Open Connection Count : 0
// Connection Throttled Count : 0
// Open Internal Connection Count : 0
assertMetricValues(queryServiceName, 0, 0, 0);
// Connection creation duration > 0
assertMetricValues(queryServiceName, 0, 0, 0, 0);
// In histogram, we will still have max open connection count as 1 while rest of the values
// will be 0.
assertHistogramMetricsForMutations(queryServiceName, 1, 0, 0, 0);
Expand Down Expand Up @@ -320,21 +324,23 @@ public void assertHistogram(HistogramDistribution histo, String histoName, long
* @param o {@link MetricType#OPEN_PHOENIX_CONNECTIONS_COUNTER}
* @param ct {@link MetricType#PHOENIX_CONNECTIONS_THROTTLED_COUNTER}
* @param io {@link MetricType#OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER}
* @param cct {@link MetricType#PHOENIX_CONNECTION_CREATION_DURATION_MS}
*/
public void assertMetricValues(String queryServiceName, int o, int ct, int io) {
public void assertMetricValues(String queryServiceName, int o, int ct, int io, int cct) {
Map<String, List<ConnectionQueryServicesMetric>> listOfMetrics =
PhoenixRuntime.getAllConnectionQueryServicesCounters();
PhoenixRuntime.getAllConnectionQueryServicesMetrics();
/*
* There are 3 metrics which are tracked as part of Phoenix Connection Query Service Metrics.
* There are 4 metrics which are tracked as part of Phoenix Connection Query Service Metrics.
* Defined here : {@link ConnectionQueryServicesMetrics.QueryServiceMetrics}
* OPEN_PHOENIX_CONNECTIONS_COUNTER OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER
* PHOENIX_CONNECTIONS_THROTTLED_COUNTER
* PHOENIX_CONNECTIONS_THROTTLED_COUNTER, PHOENIX_CONNECTION_CREATION_DURATION_MS
*/
assertEquals(3, listOfMetrics.get(queryServiceName).size());
assertEquals(4, listOfMetrics.get(queryServiceName).size());
for (ConnectionQueryServicesMetric metric : listOfMetrics.get(queryServiceName)) {
assertMetricValue(metric, OPEN_PHOENIX_CONNECTIONS_COUNTER, o, CompareOp.EQ);
assertMetricValue(metric, PHOENIX_CONNECTIONS_THROTTLED_COUNTER, ct, CompareOp.EQ);
assertMetricValue(metric, OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER, io, CompareOp.EQ);
assertMetricValue(metric, PHOENIX_CONNECTION_CREATION_DURATION_MS, cct, CompareOp.GT);
}
}

Expand All @@ -343,7 +349,7 @@ public void assertMetricValues(String queryServiceName, int o, int ct, int io) {
*/
public void assertMetricListIsEmpty() {
Map<String, List<ConnectionQueryServicesMetric>> listOfMetrics =
PhoenixRuntime.getAllConnectionQueryServicesCounters();
PhoenixRuntime.getAllConnectionQueryServicesMetrics();
assertTrue(listOfMetrics.isEmpty());
}

Expand Down