Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4428,4 +4428,10 @@ protected String getDescription() {
}
});
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public MobFileCleanerChore getMobFileCleanerChore() {
return mobFileCleanerChore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ public final class MobConstants {
public static final String MOB_COMPACTION_THREADS_MAX = "hbase.mob.compaction.threads.max";
public static final int DEFAULT_MOB_COMPACTION_THREADS_MAX = 1;

public static final String MOB_CLEANER_THREAD_COUNT = "hbase.master.mob.cleaner.threads";
public static final int DEFAULT_MOB_CLEANER_THREAD_COUNT = 1;
public static final String MOB_FILE_CLEANER_CHORE_TIME_OUT =
"hbase.master.mob.cleaner.chore.timeout";
public static final int DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT = 5 * 60; // 5 minutes

private MobConstants() {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,49 @@
*/
package org.apache.hadoop.hbase.mob;

import static org.apache.hadoop.hbase.mob.MobConstants.DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT;
import static org.apache.hadoop.hbase.mob.MobConstants.MOB_FILE_CLEANER_CHORE_TIME_OUT;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* The class MobFileCleanerChore for running cleaner regularly to remove the expired and obsolete
* (files which have no active references to) mob files.
*/
@InterfaceAudience.Private
public class MobFileCleanerChore extends ScheduledChore {
public class MobFileCleanerChore extends ScheduledChore implements ConfigurationObserver {

private static final Logger LOG = LoggerFactory.getLogger(MobFileCleanerChore.class);

private final HMaster master;
private ExpiredMobFileCleaner cleaner;
private final ExpiredMobFileCleaner cleaner;
private final ThreadPoolExecutor executor;
private final int cleanerFutureTimeout;
private int threadCount;

static {
Configuration.addDeprecation(MobConstants.DEPRECATED_MOB_CLEANER_PERIOD,
Expand All @@ -57,7 +76,21 @@ public MobFileCleanerChore(HMaster master) {
this.master = master;
cleaner = new ExpiredMobFileCleaner();
cleaner.setConf(master.getConfiguration());
threadCount = master.getConfiguration().getInt(MobConstants.MOB_CLEANER_THREAD_COUNT,
MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT);
if (threadCount < 1) {
threadCount = 1;
}

ThreadFactory threadFactory =
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("mobfile-cleaner-pool-%d").build();

executor = new ThreadPoolExecutor(threadCount, threadCount, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);

checkObsoleteConfigurations();
cleanerFutureTimeout = master.getConfiguration().getInt(MOB_FILE_CLEANER_CHORE_TIME_OUT,
DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT);
}

private void checkObsoleteConfigurations() {
Expand Down Expand Up @@ -88,29 +121,93 @@ protected void chore() {
LOG.error("MobFileCleanerChore failed", e);
return;
}
List<Future<?>> futureList = new ArrayList<>(map.size());
for (TableDescriptor htd : map.values()) {
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
try {
cleaner.cleanExpiredMobFiles(htd, hcd);
} catch (IOException e) {
LOG.error("Failed to clean the expired mob files table={} family={}",
htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
}
}
}
Future<?> future = executor.submit(() -> handleOneTable(htd));
futureList.add(future);
}

for (Future<?> future : futureList) {
try {
// Now clean obsolete files for a table
LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
try (final Admin admin = master.getConnection().getAdmin()) {
MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName(),
admin);
future.get(cleanerFutureTimeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("MobFileCleanerChore interrupted while waiting for futures", e);
Thread.currentThread().interrupt();
cancelAllFutures(futureList);
break;
} catch (ExecutionException e) {
LOG.error("Exception during execution of MobFileCleanerChore task", e);
} catch (TimeoutException e) {
LOG.error("MobFileCleanerChore timed out waiting for a task to complete", e);
}
}
}

private void cancelAllFutures(List<Future<?>> futureList) {
long pendingTaskCounter = 0;
for (Future<?> f : futureList) {
if (!f.isDone()) {
f.cancel(true); // interrupt running tasks
pendingTaskCounter++;
}
}
LOG.info("Cancelled {} pending mob file cleaner tasks", pendingTaskCounter);
}

private void handleOneTable(TableDescriptor htd) {
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
if (hcd.isMobEnabled() && hcd.getMinVersions() == 0) {
try {
cleaner.cleanExpiredMobFiles(htd, hcd);
} catch (IOException e) {
LOG.error("Failed to clean the expired mob files table={} family={}",
htd.getTableName().getNameAsString(), hcd.getNameAsString(), e);
}
LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
} catch (IOException e) {
LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e);
}
}
try {
// Now clean obsolete files for a table
LOG.info("Cleaning obsolete MOB files from table={}", htd.getTableName());
try (final Admin admin = master.getConnection().getAdmin()) {
MobFileCleanupUtil.cleanupObsoleteMobFiles(master.getConfiguration(), htd.getTableName(),
admin);
}
LOG.info("Cleaning obsolete MOB files finished for table={}", htd.getTableName());
} catch (IOException e) {
LOG.error("Failed to clean the obsolete mob files for table={}", htd.getTableName(), e);
}
}

@Override
public void onConfigurationChange(Configuration conf) {
int newThreadCount = conf.getInt(MobConstants.MOB_CLEANER_THREAD_COUNT,
MobConstants.DEFAULT_MOB_CLEANER_THREAD_COUNT);
if (newThreadCount < 1) {
return; // invalid value , skip the config change
}

if (newThreadCount != threadCount) {
resizeThreadPool(newThreadCount, newThreadCount);
threadCount = newThreadCount;
}
}

private void resizeThreadPool(int newCoreSize, int newMaxSize) {
int currentCoreSize = executor.getCorePoolSize();
if (newCoreSize > currentCoreSize) {
// Increasing the pool size: Set max first, then core
executor.setMaximumPoolSize(newMaxSize);
executor.setCorePoolSize(newCoreSize);
} else {
// Decreasing the pool size: Set core first, then max
executor.setCorePoolSize(newCoreSize);
executor.setMaximumPoolSize(newMaxSize);
}
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public ThreadPoolExecutor getExecutor() {
return executor;
}
}
Loading