Skip to content

PHOENIX-7593: Enable CompactionScanner for flushes #2134

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,10 @@ public interface QueryServices extends SQLCloseable {
public static final String PHOENIX_SERVER_PAGE_SIZE_MS = "phoenix.server.page.size.ms";
// Phoenix TTL implemented by CompactionScanner and TTLRegionScanner is enabled
public static final String PHOENIX_TABLE_TTL_ENABLED = "phoenix.table.ttl.enabled";
// Enable CompactionScanner for flushes to remove extra versions
String PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED =
"phoenix.compaction.scanner.for.flushes.enabled";
boolean DEFAULT_PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED = true;
// Copied here to avoid dependency on hbase-server
public static final String WAL_EDIT_CODEC_ATTRIB = "hbase.regionserver.wal.codec";
//Property to know whether TTL at View Level is enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,4 +543,11 @@ public static boolean isPhoenixTableTTLEnabled(Configuration conf) {
return conf.getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
}

public static boolean isCompactionScannerEnabledForFlushes(Configuration conf) {
boolean isPhoenixTableTTLEnabled = isPhoenixTableTTLEnabled(conf);
return isPhoenixTableTTLEnabled && conf.getBoolean(
QueryServices.PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED,
QueryServices.DEFAULT_PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.phoenix.filter.RowKeyComparisonFilter.RowKeyTuple;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
Expand Down Expand Up @@ -144,13 +145,13 @@ public class CompactionScanner implements InternalScanner {
private final long maxLookbackInMillis;
private int minVersion;
private int maxVersion;
private final boolean emptyCFStore;
private boolean emptyCFStore;
private final boolean localIndex;
private final int familyCount;
private KeepDeletedCells keepDeletedCells;
private long compactionTime;
private final byte[] emptyCF;
private final byte[] emptyCQ;
private byte[] emptyCF;
private byte[] emptyCQ;
private final byte[] storeColumnFamily;
private final String tableName;
private final String columnFamilyName;
Expand All @@ -162,6 +163,7 @@ public class CompactionScanner implements InternalScanner {
private long outputCellCount = 0;
private boolean phoenixLevelOnly = false;
private boolean isCDCIndex;
private boolean isEmptyCfCqInitialized;

// Only for forcing minor compaction while testing
private static boolean forceMinorCompaction = false;
Expand All @@ -180,13 +182,15 @@ public CompactionScanner(RegionCoprocessorEnvironment env,
// Empty column family and qualifier are always needed to compute which all empty cells to retain
// even during minor compactions. If required empty cells are not retained during
// minor compactions then we can run into the risk of partial row expiry on next major compaction.
this.emptyCF = SchemaUtil.getEmptyColumnFamily(table);
this.emptyCQ = SchemaUtil.getEmptyColumnQualifier(table);
this.emptyCF = table != null ? SchemaUtil.getEmptyColumnFamily(table) : null;
this.emptyCQ = table != null ? SchemaUtil.getEmptyColumnQualifier(table) : null;
// PTable will be null only for flushes
this.isEmptyCfCqInitialized = table != null;
compactionTime = EnvironmentEdgeManager.currentTimeMillis();
columnFamilyName = store.getColumnFamilyName();
storeColumnFamily = columnFamilyName.getBytes();
tableName = region.getRegionInfo().getTable().getNameAsString();
String dataTableName = table.getName().toString();
String dataTableName = table != null ? table.getName().toString() : "";
Long overriddenMaxLookback = maxLookbackMap.get(tableName + SEPARATOR + columnFamilyName);
this.maxLookbackInMillis = overriddenMaxLookback == null ?
maxLookbackAgeInMillis : Math.max(maxLookbackAgeInMillis, overriddenMaxLookback);
Expand Down Expand Up @@ -416,11 +420,41 @@ private void postProcessForConditionalTTL(List<Cell> result) {
}
}

private void determineEmptyCfCq(List<Cell> result) {
// This should be called only per instance
assert ! isEmptyCfCqInitialized;
byte[] emptyCF = null;
for (Cell cell : result) {
emptyCF = CellUtil.cloneFamily(cell);
if (ScanUtil.isEmptyColumn(cell, emptyCF, QueryConstants.EMPTY_COLUMN_BYTES)) {
emptyCQ = QueryConstants.EMPTY_COLUMN_BYTES;
break;
} else if (ScanUtil.isEmptyColumn(cell, emptyCF,
QueryConstants.ENCODED_EMPTY_COLUMN_BYTES)) {
//Empty column is always encoded in FOUR_BYTE format, since it's a reserved
// qualifier. See EncodedColumnsUtil#isReservedColumnQualifier.
emptyCQ = QueryConstants.ENCODED_EMPTY_COLUMN_BYTES;
break;
}
}
if (emptyCQ == QueryConstants.EMPTY_COLUMN_BYTES
|| emptyCQ == QueryConstants.ENCODED_EMPTY_COLUMN_BYTES) {
this.emptyCF = emptyCF;
this.emptyCFStore = true;
}
this.isEmptyCfCqInitialized = true;
}

@Override
public boolean next(List<Cell> result) throws IOException {
boolean hasMore = storeScanner.next(result);
inputCellCount += result.size();
if (!result.isEmpty()) {
// This will happen only during flushes as then we don't pass PTable object
// to determine emptyCF and emptyCQ
if (!this.isEmptyCfCqInitialized) {
determineEmptyCfCq(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For not emptycfstore aren't we doing this check on every row

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, will fix that. Thanks

}
// This is for debugging
// printRow(result, "Input for " + tableName + " " + columnFamilyName, true, false);
phoenixLevelRowCompactor.compact(result, false);
Expand Down Expand Up @@ -2459,7 +2493,8 @@ private void getLastRowVersionInMaxLookbackWindow(List<Cell> result,
}
if (cell.getType() == Cell.Type.Put) {
lastRowVersion.add(cell);
if (ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) {
if (emptyCF != null && emptyCQ != null
&& ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) {
index = addEmptyColumn(result, currentColumnCell, index, emptyColumn);
} else {
index = skipColumn(result, currentColumnCell, retainedCells, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.Region;
Expand Down Expand Up @@ -611,6 +612,29 @@ private boolean areMutationsInSameTable(Table targetHTable, Region region) {
region.getTableDescriptor().getTableName().getName()) == 0);
}

@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, FlushLifeCycleTracker tracker)
throws IOException {
if (!isCompactionScannerEnabledForFlushes(c.getEnvironment().getConfiguration())) {
return scanner;
} else {
return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
@Override public InternalScanner run() throws Exception {
String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable()
.getNameAsString();
Configuration conf = c.getEnvironment().getConfiguration();
long maxLookbackInMillis =
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf);
maxLookbackInMillis = CompactionScanner.getMaxLookbackInMillis(tableName,
store.getColumnFamilyName(), maxLookbackInMillis);
return new CompactionScanner(c.getEnvironment(), store, scanner,
maxLookbackInMillis, false, true, null);
}
});
}
}

@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

import static org.junit.Assert.assertEquals;

@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class CompactionScannerForFlushesDisabledIT extends BaseTest {
private static final int MAX_LOOKBACK_AGE = 15;
private String tableDDLOptions;
private StringBuilder optionBuilder;
ManualEnvironmentEdge injectEdge;
private int ttl;
private boolean multiCF;

public CompactionScannerForFlushesDisabledIT(boolean multiCF) {
this.multiCF = multiCF;
}

@BeforeClass
public static synchronized void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE));
props.put(QueryServices.PHOENIX_COMPACTION_SCANNER_FOR_FLUSHES_ENABLED, "false");
props.put("hbase.procedure.remote.dispatcher.delay.msec", "0");
props.put(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, "0");
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}

@Before
public void beforeTest(){
EnvironmentEdgeManager.reset();
optionBuilder = new StringBuilder();
ttl = 30;
optionBuilder.append(" TTL=" + ttl);
this.tableDDLOptions = optionBuilder.toString();
injectEdge = new ManualEnvironmentEdge();
injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis());
}

@After
public synchronized void afterTest() throws Exception {
boolean refCountLeaked = isAnyStoreRefCountLeaked();

EnvironmentEdgeManager.reset();
Assert.assertFalse("refCount leaked", refCountLeaked);
}

@Parameterized.Parameters(name = "CompactionScannerForFlushesDisabledIT_multiCF={0}")
public static Collection<Boolean> data() {
return Arrays.asList(false, true);
}

@Test
public void testRetainingAllRowVersions() throws Exception {
try(Connection conn = DriverManager.getConnection(getUrl())) {
String tableName = generateUniqueName();
createTable(tableName);
long timeIntervalBetweenTwoUpserts = (ttl / 2) + 1;
injectEdge.setValue(System.currentTimeMillis());
EnvironmentEdgeManager.injectEdge(injectEdge);
TableName dataTableName = TableName.valueOf(tableName);
injectEdge.incrementValue(1);
Statement stmt = conn.createStatement();
stmt.execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')");
conn.commit();
injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
stmt.execute("upsert into " + tableName + " values ('a', 'ab1')");
conn.commit();
injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
stmt.execute("upsert into " + tableName + " values ('a', 'ab2')");
conn.commit();
injectEdge.incrementValue(timeIntervalBetweenTwoUpserts * 1000);
stmt.execute("upsert into " + tableName + " values ('a', 'ab3')");
conn.commit();
injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);

TestUtil.dumpTable(conn, dataTableName);
byte[] rowKey = Bytes.toBytes("a");
int rawCellCount = TestUtil.getRawCellCount(conn, dataTableName, rowKey);
// 6 non-empty cells (ab3, ab2, ab1, ab, abc, abcd) + 4 empty cells (for 4 upserts)
assertEquals(10, rawCellCount);

TestUtil.flush(utility, dataTableName);
injectEdge.incrementValue(1);
rawCellCount = TestUtil.getRawCellCount(conn, dataTableName, rowKey);
// Compaction scanner wasn't used during flushes and excess rows versions are flushed
assertEquals(10, rawCellCount);
TestUtil.dumpTable(conn, dataTableName);

TestUtil.majorCompact(getUtility(), dataTableName);
injectEdge.incrementValue(1);
rawCellCount = TestUtil.getRawCellCount(conn, dataTableName, rowKey);
// 1 non-empty cell (ab3) and 1 empty cell at the edge of max lookback window will be
// retained
// 2 non-empty cells outside max lookback window will be retained (abc, abcd)
// 2 empty cells will be retained outside max lookback window
assertEquals(6, rawCellCount);
TestUtil.dumpTable(conn, dataTableName);

ResultSet rs = stmt.executeQuery("select * from " + dataTableName + " where id = 'a'");
while(rs.next()) {
assertEquals("abc", rs.getString(3));
assertEquals("abcd", rs.getString(4));
}
}
}

private void createTable(String tableName) throws SQLException {
try(Connection conn = DriverManager.getConnection(getUrl())) {
String createSql;
if (multiCF) {
createSql = "create table " + tableName +
" (id varchar(10) not null primary key, val1 varchar(10), " +
"a.val2 varchar(10), b.val3 varchar(10))" + tableDDLOptions;
}
else {
createSql = "create table " + tableName +
" (id varchar(10) not null primary key, val1 varchar(10), " +
"val2 varchar(10), val3 varchar(10))" + tableDDLOptions;
}
conn.createStatement().execute(createSql);
conn.commit();
}
}
}
Loading