diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePerformanceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePerformanceIT.java new file mode 100644 index 00000000000..df243f34cba --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BasePerformanceIT.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.phoenix.util.TestUtil; +import org.junit.Before; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +/** + * 性能测试基类,提供通用的性能测试功能 + */ +@Category(ParallelStatsDisabledTest.class) +public abstract class BasePerformanceIT extends ParallelStatsDisabledIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(BasePerformanceIT.class); + + protected static final int DEFAULT_WARMUP_ITERATIONS = 3; + protected static final int DEFAULT_TEST_ITERATIONS = 10; + protected static final int DEFAULT_DATA_SIZE = 10000; + + protected Connection conn; + protected Statement stmt; + protected Random random; + + @Before + public void setUp() throws Exception { + conn = DriverManager.getConnection(getUrl()); + stmt = conn.createStatement(); + random = new Random(42); // 固定种子以确保可重复性 + } + + /** + * 执行性能测试 + * @param testName 测试名称 + * @param sql SQL语句 + * @param iterations 迭代次数 + * @return 性能测试结果 + */ + protected PerformanceTestResult runPerformanceTest(String testName, String sql, int iterations) { + return runPerformanceTest(testName, sql, iterations, DEFAULT_WARMUP_ITERATIONS); + } + + /** + * 执行性能测试 + * @param testName 测试名称 + * @param sql SQL语句 + * @param iterations 迭代次数 + * @param warmupIterations 预热迭代次数 + * @return 性能测试结果 + */ + protected PerformanceTestResult runPerformanceTest(String testName, String sql, int iterations, int warmupIterations) { + LOGGER.info("Starting performance test: {}", testName); + LOGGER.info("SQL: {}", sql); + LOGGER.info("Iterations: {}, Warmup iterations: {}", iterations, warmupIterations); + + List executionTimes = new ArrayList<>(); + + // 预热阶段 + LOGGER.info("Warming up..."); + for (int i = 0; i < warmupIterations; i++) { + try { + long startTime = System.currentTimeMillis(); + stmt.execute(sql); + long endTime = System.currentTimeMillis(); + LOGGER.debug("Warmup iteration {}: {}ms", i + 1, endTime - startTime); + } catch (SQLException e) { + LOGGER.warn("Warmup iteration {} failed: {}", i + 1, e.getMessage()); + } + } + + // 实际测试阶段 + LOGGER.info("Running performance test..."); + for (int i = 0; i < iterations; i++) { + try { + long startTime = System.currentTimeMillis(); + stmt.execute(sql); + long endTime = System.currentTimeMillis(); + long executionTime = endTime - startTime; + executionTimes.add(executionTime); + LOGGER.debug("Test iteration {}: {}ms", i + 1, executionTime); + } catch (SQLException e) { + LOGGER.error("Test iteration {} failed: {}", i + 1, e.getMessage()); + } + } + + PerformanceTestResult result = new PerformanceTestResult(testName, sql, executionTimes); + LOGGER.info("Performance test completed: {}", result); + + return result; + } + + /** + * 执行查询性能测试 + * @param testName 测试名称 + * @param sql SQL查询语句 + * @param iterations 迭代次数 + * @return 性能测试结果 + */ + protected PerformanceTestResult runQueryPerformanceTest(String testName, String sql, int iterations) { + return runQueryPerformanceTest(testName, sql, iterations, DEFAULT_WARMUP_ITERATIONS); + } + + /** + * 执行查询性能测试 + * @param testName 测试名称 + * @param sql SQL查询语句 + * @param iterations 迭代次数 + * @param warmupIterations 预热迭代次数 + * @return 性能测试结果 + */ + protected PerformanceTestResult runQueryPerformanceTest(String testName, String sql, int iterations, int warmupIterations) { + LOGGER.info("Starting query performance test: {}", testName); + LOGGER.info("SQL: {}", sql); + LOGGER.info("Iterations: {}, Warmup iterations: {}", iterations, warmupIterations); + + List executionTimes = new ArrayList<>(); + List rowCounts = new ArrayList<>(); + + // 预热阶段 + LOGGER.info("Warming up..."); + for (int i = 0; i < warmupIterations; i++) { + try { + long startTime = System.currentTimeMillis(); + ResultSet rs = stmt.executeQuery(sql); + int rowCount = 0; + while (rs.next()) { + rowCount++; + } + rs.close(); + long endTime = System.currentTimeMillis(); + LOGGER.debug("Warmup iteration {}: {}ms, {} rows", i + 1, endTime - startTime, rowCount); + } catch (SQLException e) { + LOGGER.warn("Warmup iteration {} failed: {}", i + 1, e.getMessage()); + } + } + + // 实际测试阶段 + LOGGER.info("Running query performance test..."); + for (int i = 0; i < iterations; i++) { + try { + long startTime = System.currentTimeMillis(); + ResultSet rs = stmt.executeQuery(sql); + int rowCount = 0; + while (rs.next()) { + rowCount++; + } + rs.close(); + long endTime = System.currentTimeMillis(); + long executionTime = endTime - startTime; + executionTimes.add(executionTime); + rowCounts.add(rowCount); + LOGGER.debug("Test iteration {}: {}ms, {} rows", i + 1, executionTime, rowCount); + } catch (SQLException e) { + LOGGER.error("Test iteration {} failed: {}", i + 1, e.getMessage()); + } + } + + PerformanceTestResult result = new PerformanceTestResult(testName, sql, executionTimes, rowCounts); + LOGGER.info("Query performance test completed: {}", result); + + return result; + } + + /** + * 生成测试数据 + * @param tableName 表名 + * @param rowCount 行数 + * @param batchSize 批量大小 + */ + protected void generateTestData(String tableName, int rowCount, int batchSize) throws SQLException { + LOGGER.info("Generating {} rows of test data for table {}", rowCount, tableName); + + String insertSql = "UPSERT INTO " + tableName + " (ID, NAME, AGE, SALARY, DEPT) VALUES (?, ?, ?, ?, ?)"; + PreparedStatement pstmt = conn.prepareStatement(insertSql); + + conn.setAutoCommit(false); + + try { + for (int i = 0; i < rowCount; i++) { + pstmt.setInt(1, i); + pstmt.setString(2, "User" + i); + pstmt.setInt(3, 20 + random.nextInt(50)); + pstmt.setDouble(4, 30000 + random.nextInt(70000)); + pstmt.setString(5, "Dept" + (i % 10)); + pstmt.addBatch(); + + if ((i + 1) % batchSize == 0) { + pstmt.executeBatch(); + LOGGER.debug("Inserted {} rows", i + 1); + } + } + + // 执行剩余的批次 + if (rowCount % batchSize != 0) { + pstmt.executeBatch(); + } + + conn.commit(); + LOGGER.info("Successfully generated {} rows of test data", rowCount); + } finally { + conn.setAutoCommit(true); + pstmt.close(); + } + } + + /** + * 创建测试表 + * @param tableName 表名 + */ + protected void createTestTable(String tableName) throws SQLException { + String createTableSql = "CREATE TABLE " + tableName + " (" + + "ID INTEGER PRIMARY KEY, " + + "NAME VARCHAR(50), " + + "AGE INTEGER, " + + "SALARY DOUBLE, " + + "DEPT VARCHAR(20)" + + ")"; + + stmt.execute(createTableSql); + LOGGER.info("Created test table: {}", tableName); + } + + /** + * 清理测试表 + * @param tableName 表名 + */ + protected void cleanupTestTable(String tableName) throws SQLException { + try { + stmt.execute("DROP TABLE IF EXISTS " + tableName); + LOGGER.info("Cleaned up test table: {}", tableName); + } catch (SQLException e) { + LOGGER.warn("Failed to cleanup table {}: {}", tableName, e.getMessage()); + } + } + + /** + * 性能测试结果类 + */ + public static class PerformanceTestResult { + private final String testName; + private final String sql; + private final List executionTimes; + private final List rowCounts; + + public PerformanceTestResult(String testName, String sql, List executionTimes) { + this(testName, sql, executionTimes, null); + } + + public PerformanceTestResult(String testName, String sql, List executionTimes, List rowCounts) { + this.testName = testName; + this.sql = sql; + this.executionTimes = executionTimes; + this.rowCounts = rowCounts; + } + + public String getTestName() { + return testName; + } + + public String getSql() { + return sql; + } + + public List getExecutionTimes() { + return executionTimes; + } + + public List getRowCounts() { + return rowCounts; + } + + public long getMinExecutionTime() { + return executionTimes.stream().mapToLong(Long::longValue).min().orElse(0); + } + + public long getMaxExecutionTime() { + return executionTimes.stream().mapToLong(Long::longValue).max().orElse(0); + } + + public double getAverageExecutionTime() { + return executionTimes.stream().mapToLong(Long::longValue).average().orElse(0.0); + } + + public double getStandardDeviation() { + double mean = getAverageExecutionTime(); + double variance = executionTimes.stream() + .mapToDouble(time -> Math.pow(time - mean, 2)) + .average() + .orElse(0.0); + return Math.sqrt(variance); + } + + public int getTotalRows() { + if (rowCounts == null) { + return 0; + } + return rowCounts.stream().mapToInt(Integer::intValue).sum(); + } + + public double getAverageRowCount() { + if (rowCounts == null) { + return 0.0; + } + return rowCounts.stream().mapToInt(Integer::intValue).average().orElse(0.0); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("PerformanceTestResult{"); + sb.append("testName='").append(testName).append('\''); + sb.append(", iterations=").append(executionTimes.size()); + sb.append(", minTime=").append(getMinExecutionTime()).append("ms"); + sb.append(", maxTime=").append(getMaxExecutionTime()).append("ms"); + sb.append(", avgTime=").append(String.format("%.2f", getAverageExecutionTime())).append("ms"); + sb.append(", stdDev=").append(String.format("%.2f", getStandardDeviation())).append("ms"); + if (rowCounts != null) { + sb.append(", totalRows=").append(getTotalRows()); + sb.append(", avgRows=").append(String.format("%.2f", getAverageRowCount())); + } + sb.append('}'); + return sb.toString(); + } + } +} \ No newline at end of file diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BatchPerformanceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BatchPerformanceIT.java new file mode 100644 index 00000000000..01a8d38c062 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BatchPerformanceIT.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertTrue; + +/** + * 批量操作性能测试类 + */ +@Category(ParallelStatsDisabledTest.class) +public class BatchPerformanceIT extends BasePerformanceIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(BatchPerformanceIT.class); + + private static final String TEST_TABLE = "BATCH_PERFORMANCE_TEST"; + private static final int[] BATCH_SIZES = {100, 500, 1000, 2000, 5000}; + private static final int TEST_ITERATIONS = 3; + + @Before + public void setUpTestTable() throws Exception { + super.setUp(); + createTestTable(TEST_TABLE); + } + + @After + public void cleanup() throws Exception { + cleanupTestTable(TEST_TABLE); + } + + @Test + public void testBatchInsertPerformance() throws Exception { + LOGGER.info("Testing batch insert performance with different batch sizes..."); + + for (int batchSize : BATCH_SIZES) { + PerformanceTestResult result = runBatchInsertTest(batchSize, 10000); + + LOGGER.info("Batch Insert Performance (batch size {}):", batchSize); + LOGGER.info(" Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info(" Rows per second: {}", calculateRowsPerSecond(result.getAverageExecutionTime(), 10000)); + + // 验证性能指标 - 批量插入应该比单行插入快 + assertTrue("Batch insert should be efficient", + result.getAverageExecutionTime() < 10000); // 10秒内完成10000行 + } + } + + @Test + public void testBatchUpdatePerformance() throws Exception { + // 先插入一些数据用于更新测试 + generateTestData(TEST_TABLE, 10000, 1000); + + LOGGER.info("Testing batch update performance with different batch sizes..."); + + for (int batchSize : BATCH_SIZES) { + PerformanceTestResult result = runBatchUpdateTest(batchSize, 5000); + + LOGGER.info("Batch Update Performance (batch size {}):", batchSize); + LOGGER.info(" Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info(" Rows per second: {}", calculateRowsPerSecond(result.getAverageExecutionTime(), 5000)); + + assertTrue("Batch update should be efficient", + result.getAverageExecutionTime() < 8000); + } + } + + @Test + public void testBatchDeletePerformance() throws Exception { + // 先插入一些数据用于删除测试 + generateTestData(TEST_TABLE, 10000, 1000); + + LOGGER.info("Testing batch delete performance with different batch sizes..."); + + for (int batchSize : BATCH_SIZES) { + PerformanceTestResult result = runBatchDeleteTest(batchSize, 3000); + + LOGGER.info("Batch Delete Performance (batch size {}):", batchSize); + LOGGER.info(" Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info(" Rows per second: {}", calculateRowsPerSecond(result.getAverageExecutionTime(), 3000)); + + assertTrue("Batch delete should be efficient", + result.getAverageExecutionTime() < 6000); + } + } + + @Test + public void testLargeBatchPerformance() throws Exception { + LOGGER.info("Testing large batch insert performance..."); + + PerformanceTestResult result = runBatchInsertTest(5000, 50000); + + LOGGER.info("Large Batch Insert Performance:"); + LOGGER.info(" Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info(" Total rows: 50000"); + LOGGER.info(" Rows per second: {}", calculateRowsPerSecond(result.getAverageExecutionTime(), 50000)); + + assertTrue("Large batch insert should be efficient", + result.getAverageExecutionTime() < 30000); // 30秒内完成50000行 + } + + @Test + public void testConcurrentBatchPerformance() throws Exception { + LOGGER.info("Testing concurrent batch operations..."); + + // 创建多个表进行并发测试 + String[] tables = { + TEST_TABLE + "_CONCURRENT_1", + TEST_TABLE + "_CONCURRENT_2", + TEST_TABLE + "_CONCURRENT_3" + }; + + for (String table : tables) { + createTestTable(table); + } + + try { + List results = new ArrayList<>(); + + // 并发执行批量插入 + for (int i = 0; i < tables.length; i++) { + PerformanceTestResult result = runBatchInsertTest(1000, 5000, tables[i]); + results.add(result); + + LOGGER.info("Concurrent Batch {} Performance: {}ms avg", i + 1, result.getAverageExecutionTime()); + } + + // 验证所有并发操作都成功 + for (PerformanceTestResult result : results) { + assertTrue("Concurrent batch operation should be efficient", + result.getAverageExecutionTime() < 15000); + } + + } finally { + // 清理测试表 + for (String table : tables) { + cleanupTestTable(table); + } + } + } + + @Test + public void testMixedBatchOperations() throws Exception { + LOGGER.info("Testing mixed batch operations (insert, update, delete)..."); + + // 先插入基础数据 + generateTestData(TEST_TABLE, 10000, 1000); + + // 测试混合操作 + PerformanceTestResult insertResult = runBatchInsertTest(1000, 2000); + PerformanceTestResult updateResult = runBatchUpdateTest(1000, 2000); + PerformanceTestResult deleteResult = runBatchDeleteTest(1000, 2000); + + LOGGER.info("Mixed Batch Operations Performance:"); + LOGGER.info(" Insert: {}ms avg", insertResult.getAverageExecutionTime()); + LOGGER.info(" Update: {}ms avg", updateResult.getAverageExecutionTime()); + LOGGER.info(" Delete: {}ms avg", deleteResult.getAverageExecutionTime()); + + assertTrue("Mixed batch operations should be efficient", + insertResult.getAverageExecutionTime() < 5000 && + updateResult.getAverageExecutionTime() < 5000 && + deleteResult.getAverageExecutionTime() < 5000); + } + + /** + * 运行批量插入性能测试 + */ + private PerformanceTestResult runBatchInsertTest(int batchSize, int totalRows) throws SQLException { + return runBatchInsertTest(batchSize, totalRows, TEST_TABLE); + } + + /** + * 运行批量插入性能测试 + */ + private PerformanceTestResult runBatchInsertTest(int batchSize, int totalRows, String tableName) throws SQLException { + String testName = "Batch Insert Performance (batch size " + batchSize + ")"; + List executionTimes = new ArrayList<>(); + + for (int iteration = 0; iteration < TEST_ITERATIONS; iteration++) { + // 清空表 + stmt.execute("DELETE FROM " + tableName); + + String insertSql = "UPSERT INTO " + tableName + " (ID, NAME, AGE, SALARY, DEPT) VALUES (?, ?, ?, ?, ?)"; + PreparedStatement pstmt = conn.prepareStatement(insertSql); + + conn.setAutoCommit(false); + + try { + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < totalRows; i++) { + pstmt.setInt(1, i); + pstmt.setString(2, "User" + i); + pstmt.setInt(3, 20 + random.nextInt(50)); + pstmt.setDouble(4, 30000 + random.nextInt(70000)); + pstmt.setString(5, "Dept" + (i % 10)); + pstmt.addBatch(); + + if ((i + 1) % batchSize == 0) { + pstmt.executeBatch(); + } + } + + // 执行剩余的批次 + if (totalRows % batchSize != 0) { + pstmt.executeBatch(); + } + + conn.commit(); + + long endTime = System.currentTimeMillis(); + long executionTime = endTime - startTime; + executionTimes.add(executionTime); + + LOGGER.debug("Batch insert iteration {}: {}ms", iteration + 1, executionTime); + + } finally { + conn.setAutoCommit(true); + pstmt.close(); + } + } + + return new PerformanceTestResult(testName, "Batch Insert", executionTimes); + } + + /** + * 运行批量更新性能测试 + */ + private PerformanceTestResult runBatchUpdateTest(int batchSize, int totalRows) throws SQLException { + String testName = "Batch Update Performance (batch size " + batchSize + ")"; + List executionTimes = new ArrayList<>(); + + for (int iteration = 0; iteration < TEST_ITERATIONS; iteration++) { + String updateSql = "UPDATE " + TEST_TABLE + " SET SALARY = SALARY * 1.1 WHERE ID = ?"; + PreparedStatement pstmt = conn.prepareStatement(updateSql); + + conn.setAutoCommit(false); + + try { + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < totalRows; i++) { + pstmt.setInt(1, i); + pstmt.addBatch(); + + if ((i + 1) % batchSize == 0) { + pstmt.executeBatch(); + } + } + + // 执行剩余的批次 + if (totalRows % batchSize != 0) { + pstmt.executeBatch(); + } + + conn.commit(); + + long endTime = System.currentTimeMillis(); + long executionTime = endTime - startTime; + executionTimes.add(executionTime); + + LOGGER.debug("Batch update iteration {}: {}ms", iteration + 1, executionTime); + + } finally { + conn.setAutoCommit(true); + pstmt.close(); + } + } + + return new PerformanceTestResult(testName, "Batch Update", executionTimes); + } + + /** + * 运行批量删除性能测试 + */ + private PerformanceTestResult runBatchDeleteTest(int batchSize, int totalRows) throws SQLException { + String testName = "Batch Delete Performance (batch size " + batchSize + ")"; + List executionTimes = new ArrayList<>(); + + for (int iteration = 0; iteration < TEST_ITERATIONS; iteration++) { + String deleteSql = "DELETE FROM " + TEST_TABLE + " WHERE ID = ?"; + PreparedStatement pstmt = conn.prepareStatement(deleteSql); + + conn.setAutoCommit(false); + + try { + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < totalRows; i++) { + pstmt.setInt(1, i); + pstmt.addBatch(); + + if ((i + 1) % batchSize == 0) { + pstmt.executeBatch(); + } + } + + // 执行剩余的批次 + if (totalRows % batchSize != 0) { + pstmt.executeBatch(); + } + + conn.commit(); + + long endTime = System.currentTimeMillis(); + long executionTime = endTime - startTime; + executionTimes.add(executionTime); + + LOGGER.debug("Batch delete iteration {}: {}ms", iteration + 1, executionTime); + + } finally { + conn.setAutoCommit(true); + pstmt.close(); + } + } + + return new PerformanceTestResult(testName, "Batch Delete", executionTimes); + } + + /** + * 计算每秒处理的行数 + */ + private double calculateRowsPerSecond(long executionTimeMs, int totalRows) { + if (executionTimeMs == 0) return 0.0; + return (double) totalRows / (executionTimeMs / 1000.0); + } +} \ No newline at end of file diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryPerformanceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryPerformanceIT.java new file mode 100644 index 00000000000..168357037b5 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryPerformanceIT.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; + +import static org.junit.Assert.assertTrue; + +/** + * 查询性能测试类 + */ +@Category(ParallelStatsDisabledTest.class) +public class QueryPerformanceIT extends BasePerformanceIT { + + private static final Logger LOGGER = LoggerFactory.getLogger(QueryPerformanceIT.class); + + private static final String TEST_TABLE = "QUERY_PERFORMANCE_TEST"; + private static final int TEST_DATA_SIZE = 50000; + private static final int BATCH_SIZE = 1000; + private static final int TEST_ITERATIONS = 5; + + @Before + public void setUpTestData() throws Exception { + super.setUp(); + + // 创建测试表 + createTestTable(TEST_TABLE); + + // 生成测试数据 + generateTestData(TEST_TABLE, TEST_DATA_SIZE, BATCH_SIZE); + + // 创建索引以提高查询性能 + stmt.execute("CREATE INDEX " + TEST_TABLE + "_NAME_IDX ON " + TEST_TABLE + " (NAME)"); + stmt.execute("CREATE INDEX " + TEST_TABLE + "_DEPT_IDX ON " + TEST_TABLE + " (DEPT)"); + stmt.execute("CREATE INDEX " + TEST_TABLE + "_AGE_IDX ON " + TEST_TABLE + " (AGE)"); + + LOGGER.info("Test setup completed with {} rows of data", TEST_DATA_SIZE); + } + + @After + public void cleanup() throws Exception { + cleanupTestTable(TEST_TABLE); + } + + @Test + public void testSimpleSelectPerformance() throws Exception { + String sql = "SELECT * FROM " + TEST_TABLE + " LIMIT 1000"; + + PerformanceTestResult result = runQueryPerformanceTest( + "Simple Select Performance", sql, TEST_ITERATIONS); + + LOGGER.info("Simple Select Performance Test Results:"); + LOGGER.info("Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info("Min execution time: {}ms", result.getMinExecutionTime()); + LOGGER.info("Max execution time: {}ms", result.getMaxExecutionTime()); + LOGGER.info("Standard deviation: {}ms", result.getStandardDeviation()); + + // 验证性能指标 + assertTrue("Average execution time should be reasonable", + result.getAverageExecutionTime() < 5000); // 5秒内 + } + + @Test + public void testFilteredQueryPerformance() throws Exception { + String sql = "SELECT * FROM " + TEST_TABLE + " WHERE AGE > 30 AND SALARY > 50000"; + + PerformanceTestResult result = runQueryPerformanceTest( + "Filtered Query Performance", sql, TEST_ITERATIONS); + + LOGGER.info("Filtered Query Performance Test Results:"); + LOGGER.info("Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info("Average rows returned: {}", result.getAverageRowCount()); + + assertTrue("Average execution time should be reasonable", + result.getAverageExecutionTime() < 3000); + } + + @Test + public void testAggregationQueryPerformance() throws Exception { + String sql = "SELECT DEPT, COUNT(*), AVG(SALARY), MAX(AGE) FROM " + TEST_TABLE + + " GROUP BY DEPT ORDER BY AVG(SALARY) DESC"; + + PerformanceTestResult result = runQueryPerformanceTest( + "Aggregation Query Performance", sql, TEST_ITERATIONS); + + LOGGER.info("Aggregation Query Performance Test Results:"); + LOGGER.info("Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info("Average rows returned: {}", result.getAverageRowCount()); + + assertTrue("Average execution time should be reasonable", + result.getAverageExecutionTime() < 4000); + } + + @Test + public void testIndexedQueryPerformance() throws Exception { + String sql = "SELECT * FROM " + TEST_TABLE + " WHERE NAME = 'User1000'"; + + PerformanceTestResult result = runQueryPerformanceTest( + "Indexed Query Performance", sql, TEST_ITERATIONS); + + LOGGER.info("Indexed Query Performance Test Results:"); + LOGGER.info("Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info("Average rows returned: {}", result.getAverageRowCount()); + + // 索引查询应该很快 + assertTrue("Indexed query should be fast", + result.getAverageExecutionTime() < 1000); + } + + @Test + public void testRangeQueryPerformance() throws Exception { + String sql = "SELECT * FROM " + TEST_TABLE + " WHERE AGE BETWEEN 25 AND 35 ORDER BY SALARY DESC LIMIT 100"; + + PerformanceTestResult result = runQueryPerformanceTest( + "Range Query Performance", sql, TEST_ITERATIONS); + + LOGGER.info("Range Query Performance Test Results:"); + LOGGER.info("Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info("Average rows returned: {}", result.getAverageRowCount()); + + assertTrue("Range query should be reasonable", + result.getAverageExecutionTime() < 2000); + } + + @Test + public void testComplexQueryPerformance() throws Exception { + String sql = "SELECT DEPT, COUNT(*) as EMP_COUNT, AVG(SALARY) as AVG_SALARY, " + + "MAX(AGE) as MAX_AGE, MIN(AGE) as MIN_AGE " + + "FROM " + TEST_TABLE + " " + + "WHERE SALARY > 40000 AND AGE > 25 " + + "GROUP BY DEPT " + + "HAVING COUNT(*) > 100 " + + "ORDER BY AVG_SALARY DESC"; + + PerformanceTestResult result = runQueryPerformanceTest( + "Complex Query Performance", sql, TEST_ITERATIONS); + + LOGGER.info("Complex Query Performance Test Results:"); + LOGGER.info("Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info("Average rows returned: {}", result.getAverageRowCount()); + + assertTrue("Complex query should be reasonable", + result.getAverageExecutionTime() < 5000); + } + + @Test + public void testJoinQueryPerformance() throws Exception { + // 创建第二个表用于JOIN测试 + String joinTable = TEST_TABLE + "_JOIN"; + stmt.execute("CREATE TABLE " + joinTable + " (" + + "DEPT_ID VARCHAR(20) PRIMARY KEY, " + + "DEPT_NAME VARCHAR(50), " + + "LOCATION VARCHAR(50)" + + ")"); + + // 插入JOIN表数据 + for (int i = 0; i < 10; i++) { + stmt.execute("UPSERT INTO " + joinTable + " VALUES ('Dept" + i + "', 'Department " + i + "', 'Location " + i + "')"); + } + + String sql = "SELECT t.NAME, t.SALARY, j.DEPT_NAME, j.LOCATION " + + "FROM " + TEST_TABLE + " t " + + "JOIN " + joinTable + " j ON t.DEPT = j.DEPT_ID " + + "WHERE t.SALARY > 50000 " + + "ORDER BY t.SALARY DESC LIMIT 100"; + + PerformanceTestResult result = runQueryPerformanceTest( + "Join Query Performance", sql, TEST_ITERATIONS); + + LOGGER.info("Join Query Performance Test Results:"); + LOGGER.info("Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info("Average rows returned: {}", result.getAverageRowCount()); + + assertTrue("Join query should be reasonable", + result.getAverageExecutionTime() < 4000); + + // 清理JOIN表 + stmt.execute("DROP TABLE IF EXISTS " + joinTable); + } + + @Test + public void testSubqueryPerformance() throws Exception { + String sql = "SELECT * FROM " + TEST_TABLE + " t1 " + + "WHERE SALARY > (SELECT AVG(SALARY) FROM " + TEST_TABLE + " t2 WHERE t2.DEPT = t1.DEPT) " + + "ORDER BY SALARY DESC LIMIT 50"; + + PerformanceTestResult result = runQueryPerformanceTest( + "Subquery Performance", sql, TEST_ITERATIONS); + + LOGGER.info("Subquery Performance Test Results:"); + LOGGER.info("Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info("Average rows returned: {}", result.getAverageRowCount()); + + assertTrue("Subquery should be reasonable", + result.getAverageExecutionTime() < 6000); + } + + @Test + public void testConcurrentQueryPerformance() throws Exception { + // 测试并发查询性能 + String[] queries = { + "SELECT COUNT(*) FROM " + TEST_TABLE, + "SELECT AVG(SALARY) FROM " + TEST_TABLE + " WHERE AGE > 30", + "SELECT DEPT, COUNT(*) FROM " + TEST_TABLE + " GROUP BY DEPT", + "SELECT * FROM " + TEST_TABLE + " WHERE NAME LIKE 'User%' LIMIT 100" + }; + + LOGGER.info("Starting concurrent query performance test..."); + + for (int i = 0; i < queries.length; i++) { + PerformanceTestResult result = runQueryPerformanceTest( + "Concurrent Query " + (i + 1), queries[i], TEST_ITERATIONS); + + LOGGER.info("Concurrent Query {} Performance: {}ms avg", i + 1, result.getAverageExecutionTime()); + + assertTrue("Concurrent query should be reasonable", + result.getAverageExecutionTime() < 3000); + } + } + + @Test + public void testLargeResultSetPerformance() throws Exception { + String sql = "SELECT * FROM " + TEST_TABLE + " ORDER BY ID"; + + PerformanceTestResult result = runQueryPerformanceTest( + "Large Result Set Performance", sql, 3); // 减少迭代次数,因为结果集很大 + + LOGGER.info("Large Result Set Performance Test Results:"); + LOGGER.info("Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info("Total rows returned: {}", result.getTotalRows()); + + assertTrue("Large result set query should be reasonable", + result.getAverageExecutionTime() < 10000); + } + + @Test + public void testUpdatePerformance() throws Exception { + String updateSql = "UPDATE " + TEST_TABLE + " SET SALARY = SALARY * 1.1 WHERE AGE > 40"; + + PerformanceTestResult result = runPerformanceTest( + "Update Performance", updateSql, 3); // 减少迭代次数,因为这是更新操作 + + LOGGER.info("Update Performance Test Results:"); + LOGGER.info("Average execution time: {}ms", result.getAverageExecutionTime()); + + assertTrue("Update operation should be reasonable", + result.getAverageExecutionTime() < 5000); + } + + @Test + public void testDeletePerformance() throws Exception { + // 先创建一些要删除的数据 + stmt.execute("CREATE TABLE " + TEST_TABLE + "_DELETE AS SELECT * FROM " + TEST_TABLE + " WHERE ID < 1000"); + + String deleteSql = "DELETE FROM " + TEST_TABLE + "_DELETE WHERE AGE < 25"; + + PerformanceTestResult result = runPerformanceTest( + "Delete Performance", deleteSql, 3); + + LOGGER.info("Delete Performance Test Results:"); + LOGGER.info("Average execution time: {}ms", result.getAverageExecutionTime()); + + assertTrue("Delete operation should be reasonable", + result.getAverageExecutionTime() < 3000); + + // 清理 + stmt.execute("DROP TABLE IF EXISTS " + TEST_TABLE + "_DELETE"); + } +} \ No newline at end of file diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/README_PERFORMANCE.md b/phoenix-core/src/it/java/org/apache/phoenix/end2end/README_PERFORMANCE.md new file mode 100644 index 00000000000..33e6dede08c --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/README_PERFORMANCE.md @@ -0,0 +1,252 @@ +# Phoenix SQL执行日志和性能测试功能 + +本文档介绍了Phoenix项目中新增的SQL执行日志和性能测试功能。 + +## 1. SQL执行日志功能 + +### 1.1 功能概述 + +新增的SQL执行日志功能提供了详细的SQL执行信息记录,包括: +- SQL语句执行时间 +- 影响的行数 +- 执行状态(成功/失败) +- 慢查询检测 +- 批量操作性能监控 +- 连接信息记录 + +### 1.2 日志配置 + +日志配置位于 `phoenix-core/src/test/resources/log4j2-test.properties`,包含以下配置: + +```properties +# SQL执行日志文件appender +appender.sql.type = File +appender.sql.name = SQLFile +appender.sql.fileName = ${sys:phoenix.log.dir:-logs}/phoenix-sql-execution.log +appender.sql.filePattern = ${sys:phoenix.log.dir:-logs}/phoenix-sql-execution-%d{yyyy-MM-dd}-%i.log.gz + +# SQL执行日志配置 +logger.sql.name = org.apache.phoenix.jdbc +logger.sql.level = INFO +logger.sql.additivity = false +logger.sql.appenderRef.sql.ref = SQLFile + +# 查询执行时间日志 +logger.queryTime.name = org.apache.phoenix.query +logger.queryTime.level = INFO +logger.queryTime.additivity = false +logger.queryTime.appenderRef.sql.ref = SQLFile + +# 性能监控日志 +logger.performance.name = org.apache.phoenix.monitoring +logger.performance.level = INFO +logger.performance.additivity = false +logger.performance.appenderRef.sql.ref = SQLFile +``` + +### 1.3 日志格式 + +SQL执行日志包含以下信息: + +``` +SQL_EXECUTION_START [Q1] SQL: SELECT * FROM users WHERE id = 1 PARAMETERS: [1] +SQL_EXECUTION_END [Q1] STATUS: SUCCESS EXECUTION_TIME: 150ms ROWS_AFFECTED: 1 +QUERY_TIME [Q1] 150ms +``` + +### 1.4 慢查询检测 + +系统会自动检测慢查询并记录警告: + +``` +SLOW_QUERY [Q1] 1500ms exceeds default threshold 1000ms +``` + +可以通过以下方式设置慢查询阈值: + +```java +SQLLogger.setSlowQueryThreshold("SELECT", 1000); +SQLLogger.setSlowQueryThreshold("INSERT", 2000); +``` + +### 1.5 使用示例 + +```java +// 记录查询开始 +String queryId = SQLLogger.logQueryStart("SELECT * FROM users WHERE age > 25"); + +// 执行查询 +ResultSet rs = stmt.executeQuery("SELECT * FROM users WHERE age > 25"); +int rowCount = 0; +while (rs.next()) { + rowCount++; +} + +// 记录查询结束 +SQLLogger.logQueryEnd(queryId, rowCount, true); +``` + +## 2. 性能测试功能 + +### 2.1 功能概述 + +新增的性能测试功能提供了全面的SQL性能测试框架,包括: +- 查询性能测试 +- 批量操作性能测试 +- 并发性能测试 +- 性能指标统计 +- 自动化性能验证 + +### 2.2 测试基类 + +`BasePerformanceIT` 提供了通用的性能测试功能: + +```java +public abstract class BasePerformanceIT extends ParallelStatsDisabledIT { + // 执行性能测试 + protected PerformanceTestResult runPerformanceTest(String testName, String sql, int iterations); + + // 执行查询性能测试 + protected PerformanceTestResult runQueryPerformanceTest(String testName, String sql, int iterations); + + // 生成测试数据 + protected void generateTestData(String tableName, int rowCount, int batchSize); + + // 创建测试表 + protected void createTestTable(String tableName); +} +``` + +### 2.3 性能测试结果 + +`PerformanceTestResult` 类提供详细的性能指标: + +```java +public class PerformanceTestResult { + public long getMinExecutionTime(); // 最小执行时间 + public long getMaxExecutionTime(); // 最大执行时间 + public double getAverageExecutionTime(); // 平均执行时间 + public double getStandardDeviation(); // 标准差 + public int getTotalRows(); // 总行数 + public double getAverageRowCount(); // 平均行数 +} +``` + +### 2.4 查询性能测试 + +`QueryPerformanceIT` 提供了各种查询类型的性能测试: + +```java +@Test +public void testSimpleSelectPerformance() throws Exception { + String sql = "SELECT * FROM " + TEST_TABLE + " LIMIT 1000"; + PerformanceTestResult result = runQueryPerformanceTest( + "Simple Select Performance", sql, TEST_ITERATIONS); + + assertTrue("Average execution time should be reasonable", + result.getAverageExecutionTime() < 5000); +} +``` + +### 2.5 批量操作性能测试 + +`BatchPerformanceIT` 提供了批量操作的性能测试: + +```java +@Test +public void testBatchInsertPerformance() throws Exception { + for (int batchSize : BATCH_SIZES) { + PerformanceTestResult result = runBatchInsertTest(batchSize, 10000); + + LOGGER.info("Batch Insert Performance (batch size {}):", batchSize); + LOGGER.info(" Average execution time: {}ms", result.getAverageExecutionTime()); + LOGGER.info(" Rows per second: {}", calculateRowsPerSecond(result.getAverageExecutionTime(), 10000)); + } +} +``` + +### 2.6 运行性能测试 + +#### 2.6.1 运行所有性能测试 + +```bash +mvn test -Dtest=*PerformanceIT +``` + +#### 2.6.2 运行特定性能测试 + +```bash +mvn test -Dtest=QueryPerformanceIT +mvn test -Dtest=BatchPerformanceIT +``` + +#### 2.6.3 运行单个测试方法 + +```bash +mvn test -Dtest=QueryPerformanceIT#testSimpleSelectPerformance +``` + +### 2.7 性能测试配置 + +可以通过以下方式调整性能测试参数: + +```java +// 在测试类中设置 +private static final int TEST_ITERATIONS = 10; // 测试迭代次数 +private static final int DEFAULT_WARMUP_ITERATIONS = 3; // 预热迭代次数 +private static final int TEST_DATA_SIZE = 50000; // 测试数据大小 +``` + +### 2.8 性能测试最佳实践 + +1. **预热阶段**:在正式测试前进行预热,确保JVM和数据库缓存已预热 +2. **多次迭代**:运行多次迭代以获得更准确的性能数据 +3. **统计分析**:使用平均值、标准差等统计指标评估性能 +4. **环境一致性**:确保测试环境的一致性,避免外部因素影响 +5. **监控资源**:在性能测试期间监控CPU、内存、网络等资源使用情况 + +### 2.9 性能基准 + +以下是一些性能基准参考: + +| 操作类型 | 预期性能 | 说明 | +|---------|---------|------| +| 简单查询 | < 5秒 | 包含LIMIT的简单SELECT | +| 过滤查询 | < 3秒 | 带WHERE条件的查询 | +| 聚合查询 | < 4秒 | GROUP BY、聚合函数 | +| 索引查询 | < 1秒 | 使用索引的查询 | +| 批量插入 | < 10秒/万行 | 批量插入操作 | +| 批量更新 | < 8秒/万行 | 批量更新操作 | +| 批量删除 | < 6秒/万行 | 批量删除操作 | + +### 2.10 故障排除 + +#### 2.10.1 常见问题 + +1. **测试超时**:增加测试超时时间或减少数据量 +2. **内存不足**:减少测试数据量或增加JVM内存 +3. **连接超时**:检查数据库连接配置 +4. **日志文件过大**:调整日志轮转配置 + +#### 2.10.2 调试技巧 + +1. 启用DEBUG日志级别查看详细信息 +2. 使用性能分析工具分析瓶颈 +3. 检查数据库执行计划 +4. 监控系统资源使用情况 + +## 3. 总结 + +新增的SQL执行日志和性能测试功能为Phoenix项目提供了: + +1. **详细的SQL执行监控**:记录所有SQL执行的详细信息 +2. **慢查询检测**:自动识别和警告慢查询 +3. **全面的性能测试**:覆盖各种SQL操作类型的性能测试 +4. **自动化性能验证**:确保性能指标符合预期 +5. **性能基准**:提供性能基准参考 + +这些功能有助于: +- 监控生产环境中的SQL性能 +- 识别性能瓶颈 +- 验证性能优化效果 +- 确保系统性能符合要求 \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index d3df4647095..eaee06d0513 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -2036,11 +2036,15 @@ public void clearBatch() throws SQLException { */ @Override public int[] executeBatch() throws SQLException { + String queryId = SQLLogger.logQueryStart("BATCH_EXECUTION"); + long startTime = System.currentTimeMillis(); + int i = 0; int[] returnCodes = new int [batch.size()]; Arrays.fill(returnCodes, -1); boolean autoCommit = connection.getAutoCommit(); connection.setAutoCommit(false); + try { for (i = 0; i < returnCodes.length; i++) { PhoenixPreparedStatement statement = batch.get(i); @@ -2054,8 +2058,24 @@ public int[] executeBatch() throws SQLException { if (autoCommit) { connection.commit(); } + + // 计算总影响行数 + int totalRows = 0; + for (int result : returnCodes) { + if (result > 0) { + totalRows += result; + } + } + + long endTime = System.currentTimeMillis(); + long executionTime = endTime - startTime; + + SQLLogger.logQueryEnd(queryId, totalRows, true); + SQLLogger.logBatchOperation("BATCH_EXECUTION", batch.size(), executionTime); + return returnCodes; } catch (SQLException t) { + SQLLogger.logQueryException(queryId, t); if (i == returnCodes.length) { // Exception after for loop, perhaps in commit(), discard returnCodes. throw new BatchUpdateException(t); @@ -2214,26 +2234,52 @@ public ResultSet executeQuery(String sql) throws SQLException { "Execute query: " + sql, connection)); } - CompilableStatement stmt = parseStatement(sql); - if (stmt.getOperation().isMutation()) { - throw new ExecuteQueryNotApplicableException(sql); + String queryId = SQLLogger.logQueryStart(sql); + + try { + CompilableStatement stmt = parseStatement(sql); + if (stmt.getOperation().isMutation()) { + throw new ExecuteQueryNotApplicableException(sql); + } + ResultSet resultSet = executeQuery(stmt, createQueryLogger(stmt, sql)); + + // 获取结果集行数(如果可能) + int rowCount = 0; + if (resultSet instanceof PhoenixResultSet) { + // 这里可以尝试获取行数,但可能需要遍历结果集 + // 为了性能考虑,我们暂时不计算行数 + } + + SQLLogger.logQueryEnd(queryId, rowCount, true); + return resultSet; + } catch (SQLException e) { + SQLLogger.logQueryException(queryId, e); + throw e; } - return executeQuery(stmt, createQueryLogger(stmt, sql)); } @Override public int executeUpdate(String sql) throws SQLException { - CompilableStatement stmt = parseStatement(sql); - if (!stmt.getOperation().isMutation) { - throw new ExecuteUpdateNotApplicableException(sql); - } - if (!batch.isEmpty()) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH) - .build().buildException(); + String queryId = SQLLogger.logQueryStart(sql); + + try { + CompilableStatement stmt = parseStatement(sql); + if (!stmt.getOperation().isMutation) { + throw new ExecuteUpdateNotApplicableException(sql); + } + if (!batch.isEmpty()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH) + .build().buildException(); + } + int updateCount = executeMutation(stmt, createAuditQueryLogger(stmt, sql)); + flushIfNecessary(); + + SQLLogger.logQueryEnd(queryId, updateCount, true); + return updateCount; + } catch (SQLException e) { + SQLLogger.logQueryException(queryId, e); + throw e; } - int updateCount = executeMutation(stmt, createAuditQueryLogger(stmt, sql)); - flushIfNecessary(); - return updateCount; } private void flushIfNecessary() throws SQLException { @@ -2244,19 +2290,29 @@ private void flushIfNecessary() throws SQLException { @Override public boolean execute(String sql) throws SQLException { - CompilableStatement stmt = parseStatement(sql); - if (stmt.getOperation().isMutation()) { - if (!batch.isEmpty()) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH) - .build().buildException(); + String queryId = SQLLogger.logQueryStart(sql); + + try { + CompilableStatement stmt = parseStatement(sql); + if (stmt.getOperation().isMutation()) { + if (!batch.isEmpty()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH) + .build().buildException(); + } + int updateCount = executeMutation(stmt, createAuditQueryLogger(stmt, sql)); + flushIfNecessary(); + + SQLLogger.logQueryEnd(queryId, updateCount, true); + return false; } - executeMutation(stmt, createAuditQueryLogger(stmt, sql)); - flushIfNecessary(); - return false; + + executeQuery(stmt, createQueryLogger(stmt, sql)); + SQLLogger.logQueryEnd(queryId, 0, true); + return true; + } catch (SQLException e) { + SQLLogger.logQueryException(queryId, e); + throw e; } - - executeQuery(stmt, createQueryLogger(stmt, sql)); - return true; } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/SQLLogger.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/SQLLogger.java new file mode 100644 index 00000000000..c5ddf60859b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/SQLLogger.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * SQL执行日志记录器,用于记录SQL执行的详细信息 + */ +public class SQLLogger { + private static final Logger LOGGER = LoggerFactory.getLogger(SQLLogger.class); + private static final Logger SQL_EXECUTION_LOGGER = LoggerFactory.getLogger("org.apache.phoenix.jdbc"); + private static final Logger QUERY_TIME_LOGGER = LoggerFactory.getLogger("org.apache.phoenix.query"); + private static final Logger PERFORMANCE_LOGGER = LoggerFactory.getLogger("org.apache.phoenix.monitoring"); + + private static final AtomicLong queryCounter = new AtomicLong(0); + private static final Map slowQueryThresholds = new ConcurrentHashMap<>(); + + // 默认慢查询阈值(毫秒) + private static final long DEFAULT_SLOW_QUERY_THRESHOLD = 1000; + + /** + * 记录SQL执行开始 + * @param sql SQL语句 + * @param parameters 参数(可选) + * @return 查询ID + */ + public static String logQueryStart(String sql, Object... parameters) { + String queryId = "Q" + queryCounter.incrementAndGet(); + long startTime = System.currentTimeMillis(); + + StringBuilder logMessage = new StringBuilder(); + logMessage.append("SQL_EXECUTION_START [").append(queryId).append("] "); + logMessage.append("SQL: ").append(sql); + + if (parameters != null && parameters.length > 0) { + logMessage.append(" PARAMETERS: ["); + for (int i = 0; i < parameters.length; i++) { + if (i > 0) logMessage.append(", "); + logMessage.append(parameters[i]); + } + logMessage.append("]"); + } + + SQL_EXECUTION_LOGGER.info(logMessage.toString()); + + // 存储开始时间 + ThreadLocalQueryContext.setStartTime(startTime); + ThreadLocalQueryContext.setQueryId(queryId); + + return queryId; + } + + /** + * 记录SQL执行完成 + * @param queryId 查询ID + * @param rowCount 影响的行数 + * @param success 是否成功 + */ + public static void logQueryEnd(String queryId, int rowCount, boolean success) { + long endTime = System.currentTimeMillis(); + Long startTime = ThreadLocalQueryContext.getStartTime(); + + if (startTime != null) { + long executionTime = endTime - startTime; + String status = success ? "SUCCESS" : "FAILED"; + + StringBuilder logMessage = new StringBuilder(); + logMessage.append("SQL_EXECUTION_END [").append(queryId).append("] "); + logMessage.append("STATUS: ").append(status).append(" "); + logMessage.append("EXECUTION_TIME: ").append(executionTime).append("ms "); + logMessage.append("ROWS_AFFECTED: ").append(rowCount); + + SQL_EXECUTION_LOGGER.info(logMessage.toString()); + + // 记录查询时间 + QUERY_TIME_LOGGER.info("QUERY_TIME [{}] {}ms", queryId, executionTime); + + // 检查是否为慢查询 + checkSlowQuery(queryId, executionTime); + + // 清理ThreadLocal + ThreadLocalQueryContext.clear(); + } + } + + /** + * 记录SQL执行异常 + * @param queryId 查询ID + * @param exception 异常 + */ + public static void logQueryException(String queryId, SQLException exception) { + long endTime = System.currentTimeMillis(); + Long startTime = ThreadLocalQueryContext.getStartTime(); + + if (startTime != null) { + long executionTime = endTime - startTime; + + StringBuilder logMessage = new StringBuilder(); + logMessage.append("SQL_EXECUTION_ERROR [").append(queryId).append("] "); + logMessage.append("EXECUTION_TIME: ").append(executionTime).append("ms "); + logMessage.append("ERROR: ").append(exception.getMessage()); + logMessage.append(" ERROR_CODE: ").append(exception.getErrorCode()); + logMessage.append(" SQL_STATE: ").append(exception.getSQLState()); + + SQL_EXECUTION_LOGGER.error(logMessage.toString(), exception); + + // 清理ThreadLocal + ThreadLocalQueryContext.clear(); + } + } + + /** + * 设置慢查询阈值 + * @param sqlType SQL类型(如SELECT、INSERT等) + * @param threshold 阈值(毫秒) + */ + public static void setSlowQueryThreshold(String sqlType, long threshold) { + slowQueryThresholds.put(sqlType.toUpperCase(), threshold); + } + + /** + * 检查是否为慢查询 + * @param queryId 查询ID + * @param executionTime 执行时间 + */ + private static void checkSlowQuery(String queryId, long executionTime) { + long threshold = DEFAULT_SLOW_QUERY_THRESHOLD; + + // 可以根据SQL类型设置不同的阈值 + for (Map.Entry entry : slowQueryThresholds.entrySet()) { + if (executionTime > entry.getValue()) { + PERFORMANCE_LOGGER.warn("SLOW_QUERY [{}] {}ms exceeds threshold {}ms for type {}", + queryId, executionTime, entry.getValue(), entry.getKey()); + return; + } + } + + if (executionTime > threshold) { + PERFORMANCE_LOGGER.warn("SLOW_QUERY [{}] {}ms exceeds default threshold {}ms", + queryId, executionTime, threshold); + } + } + + /** + * 记录批量操作 + * @param operation 操作类型 + * @param batchSize 批量大小 + * @param executionTime 执行时间 + */ + public static void logBatchOperation(String operation, int batchSize, long executionTime) { + PERFORMANCE_LOGGER.info("BATCH_OPERATION {} BATCH_SIZE: {} EXECUTION_TIME: {}ms", + operation, batchSize, executionTime); + } + + /** + * 记录连接信息 + * @param connectionId 连接ID + * @param operation 操作类型 + * @param details 详细信息 + */ + public static void logConnectionInfo(String connectionId, String operation, String details) { + SQL_EXECUTION_LOGGER.info("CONNECTION [{}] {}: {}", connectionId, operation, details); + } + + /** + * ThreadLocal上下文,用于存储查询相关信息 + */ + private static class ThreadLocalQueryContext { + private static final ThreadLocal startTime = new ThreadLocal<>(); + private static final ThreadLocal queryId = new ThreadLocal<>(); + + public static void setStartTime(long time) { + startTime.set(time); + } + + public static Long getStartTime() { + return startTime.get(); + } + + public static void setQueryId(String id) { + queryId.set(id); + } + + public static String getQueryId() { + return queryId.get(); + } + + public static void clear() { + startTime.remove(); + queryId.remove(); + } + } +} \ No newline at end of file diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/SQLLoggerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/SQLLoggerTest.java new file mode 100644 index 00000000000..d6b7957835e --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/SQLLoggerTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.jdbc; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * SQLLogger功能测试 + */ +@Category(ParallelStatsDisabledTest.class) +public class SQLLoggerTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(SQLLoggerTest.class); + + @Test + public void testLogQueryStart() { + String sql = "SELECT * FROM test_table WHERE id = 1"; + String queryId = SQLLogger.logQueryStart(sql); + + assertNotNull("Query ID should not be null", queryId); + assertTrue("Query ID should start with 'Q'", queryId.startsWith("Q")); + + LOGGER.info("Generated query ID: {}", queryId); + } + + @Test + public void testLogQueryStartWithParameters() { + String sql = "SELECT * FROM test_table WHERE id = ? AND name = ?"; + Object[] parameters = {1, "test"}; + + String queryId = SQLLogger.logQueryStart(sql, parameters); + + assertNotNull("Query ID should not be null", queryId); + assertTrue("Query ID should start with 'Q'", queryId.startsWith("Q")); + + LOGGER.info("Generated query ID with parameters: {}", queryId); + } + + @Test + public void testLogQueryEnd() { + String sql = "SELECT * FROM test_table"; + String queryId = SQLLogger.logQueryStart(sql); + + // 模拟查询执行 + try { + Thread.sleep(100); // 模拟执行时间 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + SQLLogger.logQueryEnd(queryId, 10, true); + + LOGGER.info("Logged query end for ID: {}", queryId); + } + + @Test + public void testLogQueryException() { + String sql = "SELECT * FROM non_existent_table"; + String queryId = SQLLogger.logQueryStart(sql); + + // 模拟查询执行 + try { + Thread.sleep(50); // 模拟执行时间 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + SQLException exception = new SQLException("Table does not exist", "42S02", 1146); + SQLLogger.logQueryException(queryId, exception); + + LOGGER.info("Logged query exception for ID: {}", queryId); + } + + @Test + public void testLogBatchOperation() { + String operation = "BATCH_INSERT"; + int batchSize = 1000; + long executionTime = 500; + + SQLLogger.logBatchOperation(operation, batchSize, executionTime); + + LOGGER.info("Logged batch operation: {} with size {} and time {}ms", + operation, batchSize, executionTime); + } + + @Test + public void testLogConnectionInfo() { + String connectionId = "CONN_001"; + String operation = "CONNECT"; + String details = "User: test, Database: phoenix"; + + SQLLogger.logConnectionInfo(connectionId, operation, details); + + LOGGER.info("Logged connection info: {} {} {}", connectionId, operation, details); + } + + @Test + public void testSetSlowQueryThreshold() { + SQLLogger.setSlowQueryThreshold("SELECT", 1000); + SQLLogger.setSlowQueryThreshold("INSERT", 2000); + SQLLogger.setSlowQueryThreshold("UPDATE", 1500); + + LOGGER.info("Set slow query thresholds for different SQL types"); + } + + @Test + public void testMultipleQueryLogging() { + // 测试多个查询的日志记录 + String[] queries = { + "SELECT COUNT(*) FROM users", + "INSERT INTO users (id, name) VALUES (1, 'test')", + "UPDATE users SET name = 'updated' WHERE id = 1", + "DELETE FROM users WHERE id = 1" + }; + + for (int i = 0; i < queries.length; i++) { + String queryId = SQLLogger.logQueryStart(queries[i]); + + // 模拟执行时间 + try { + Thread.sleep(50 + i * 10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + SQLLogger.logQueryEnd(queryId, i + 1, true); + + LOGGER.info("Completed query {}: {}", i + 1, queries[i]); + } + } + + @Test + public void testSlowQueryDetection() { + // 设置慢查询阈值 + SQLLogger.setSlowQueryThreshold("SELECT", 100); + + String queryId = SQLLogger.logQueryStart("SELECT * FROM large_table"); + + // 模拟慢查询 + try { + Thread.sleep(200); // 超过阈值 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + SQLLogger.logQueryEnd(queryId, 1000, true); + + LOGGER.info("Tested slow query detection for ID: {}", queryId); + } + + @Test + public void testQueryWithLongSQL() { + // 测试长SQL语句的日志记录 + StringBuilder longSql = new StringBuilder(); + longSql.append("SELECT u.id, u.name, u.email, p.title, p.content, c.comment "); + longSql.append("FROM users u "); + longSql.append("LEFT JOIN posts p ON u.id = p.user_id "); + longSql.append("LEFT JOIN comments c ON p.id = c.post_id "); + longSql.append("WHERE u.status = 'active' "); + longSql.append("AND p.published_date > '2023-01-01' "); + longSql.append("ORDER BY u.name, p.published_date DESC "); + longSql.append("LIMIT 100"); + + String queryId = SQLLogger.logQueryStart(longSql.toString()); + + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + SQLLogger.logQueryEnd(queryId, 50, true); + + LOGGER.info("Tested long SQL query logging for ID: {}", queryId); + } + + @Test + public void testConcurrentQueryLogging() throws InterruptedException { + // 测试并发查询日志记录 + int threadCount = 5; + Thread[] threads = new Thread[threadCount]; + + for (int i = 0; i < threadCount; i++) { + final int threadId = i; + threads[i] = new Thread(() -> { + String queryId = SQLLogger.logQueryStart("SELECT * FROM table_" + threadId); + + try { + Thread.sleep(50 + threadId * 10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + SQLLogger.logQueryEnd(queryId, threadId + 1, true); + + LOGGER.info("Thread {} completed query with ID: {}", threadId, queryId); + }); + } + + // 启动所有线程 + for (Thread thread : threads) { + thread.start(); + } + + // 等待所有线程完成 + for (Thread thread : threads) { + thread.join(); + } + + LOGGER.info("Completed concurrent query logging test"); + } +} \ No newline at end of file diff --git a/phoenix-core/src/test/resources/log4j2-test.properties b/phoenix-core/src/test/resources/log4j2-test.properties index b1c708cc066..7987e833156 100644 --- a/phoenix-core/src/test/resources/log4j2-test.properties +++ b/phoenix-core/src/test/resources/log4j2-test.properties @@ -28,6 +28,22 @@ appender.console.maxSize = 1G appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{ISO8601} %-5p [%t] %C{2}(%L): %m%n +# SQL执行日志文件appender +appender.sql.type = File +appender.sql.name = SQLFile +appender.sql.fileName = ${sys:phoenix.log.dir:-logs}/phoenix-sql-execution.log +appender.sql.filePattern = ${sys:phoenix.log.dir:-logs}/phoenix-sql-execution-%d{yyyy-MM-dd}-%i.log.gz +appender.sql.layout.type = PatternLayout +appender.sql.layout.pattern = %d{ISO8601} %-5p [%t] %C{2}(%L): %m%n +appender.sql.policies.type = Policies +appender.sql.policies.time.type = TimeBasedTriggeringPolicy +appender.sql.policies.time.interval = 1 +appender.sql.policies.time.modulate = true +appender.sql.policies.size.type = SizeBasedTriggeringPolicy +appender.sql.policies.size.size = 100MB +appender.sql.strategy.type = DefaultRolloverStrategy +appender.sql.strategy.max = 10 + rootLogger = DEBUG,Console # TODO review settings below @@ -48,3 +64,21 @@ logger.directory.level = WARN logger.ehcache.name = net.sf.ehcache logger.ehcache.level = WARN + +# SQL执行日志配置 +logger.sql.name = org.apache.phoenix.jdbc +logger.sql.level = INFO +logger.sql.additivity = false +logger.sql.appenderRef.sql.ref = SQLFile + +# 查询执行时间日志 +logger.queryTime.name = org.apache.phoenix.query +logger.queryTime.level = INFO +logger.queryTime.additivity = false +logger.queryTime.appenderRef.sql.ref = SQLFile + +# 性能监控日志 +logger.performance.name = org.apache.phoenix.monitoring +logger.performance.level = INFO +logger.performance.additivity = false +logger.performance.appenderRef.sql.ref = SQLFile