1717 */
1818package org .apache .hadoop .hbase .mob ;
1919
20+ import static org .apache .hadoop .hbase .mob .MobConstants .DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT ;
21+ import static org .apache .hadoop .hbase .mob .MobConstants .MOB_FILE_CLEANER_CHORE_TIME_OUT ;
22+
23+ import com .google .errorprone .annotations .RestrictedApi ;
2024import java .io .IOException ;
25+ import java .util .ArrayList ;
26+ import java .util .List ;
2127import java .util .Map ;
28+ import java .util .concurrent .ExecutionException ;
29+ import java .util .concurrent .Future ;
30+ import java .util .concurrent .LinkedBlockingQueue ;
31+ import java .util .concurrent .ThreadFactory ;
32+ import java .util .concurrent .ThreadPoolExecutor ;
2233import java .util .concurrent .TimeUnit ;
34+ import java .util .concurrent .TimeoutException ;
2335import org .apache .hadoop .conf .Configuration ;
2436import org .apache .hadoop .hbase .ScheduledChore ;
2537import org .apache .hadoop .hbase .TableDescriptors ;
2638import org .apache .hadoop .hbase .client .Admin ;
2739import org .apache .hadoop .hbase .client .ColumnFamilyDescriptor ;
2840import org .apache .hadoop .hbase .client .TableDescriptor ;
41+ import org .apache .hadoop .hbase .conf .ConfigurationObserver ;
2942import org .apache .hadoop .hbase .master .HMaster ;
3043import org .apache .yetus .audience .InterfaceAudience ;
3144import org .slf4j .Logger ;
3245import org .slf4j .LoggerFactory ;
3346
47+ import org .apache .hbase .thirdparty .com .google .common .util .concurrent .ThreadFactoryBuilder ;
48+
3449/**
3550 * The class MobFileCleanerChore for running cleaner regularly to remove the expired and obsolete
3651 * (files which have no active references to) mob files.
3752 */
3853@ InterfaceAudience .Private
39- public class MobFileCleanerChore extends ScheduledChore {
54+ public class MobFileCleanerChore extends ScheduledChore implements ConfigurationObserver {
4055
4156 private static final Logger LOG = LoggerFactory .getLogger (MobFileCleanerChore .class );
57+
4258 private final HMaster master ;
43- private ExpiredMobFileCleaner cleaner ;
59+ private final ExpiredMobFileCleaner cleaner ;
60+ private final ThreadPoolExecutor executor ;
61+ private final int cleanerFutureTimeout ;
62+ private int threadCount ;
4463
4564 public MobFileCleanerChore (HMaster master ) {
4665 super (master .getServerName () + "-MobFileCleanerChore" , master ,
@@ -52,7 +71,21 @@ public MobFileCleanerChore(HMaster master) {
5271 this .master = master ;
5372 cleaner = new ExpiredMobFileCleaner ();
5473 cleaner .setConf (master .getConfiguration ());
74+ threadCount = master .getConfiguration ().getInt (MobConstants .MOB_CLEANER_THREAD_COUNT ,
75+ MobConstants .DEFAULT_MOB_CLEANER_THREAD_COUNT );
76+ if (threadCount < 1 ) {
77+ threadCount = 1 ;
78+ }
79+
80+ ThreadFactory threadFactory =
81+ new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("mobfile-cleaner-pool-%d" ).build ();
82+
83+ executor = new ThreadPoolExecutor (threadCount , threadCount , 60 , TimeUnit .SECONDS ,
84+ new LinkedBlockingQueue <Runnable >(), threadFactory );
85+
5586 checkObsoleteConfigurations ();
87+ cleanerFutureTimeout = master .getConfiguration ().getInt (MOB_FILE_CLEANER_CHORE_TIME_OUT ,
88+ DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT );
5689 }
5790
5891 private void checkObsoleteConfigurations () {
@@ -83,29 +116,93 @@ protected void chore() {
83116 LOG .error ("MobFileCleanerChore failed" , e );
84117 return ;
85118 }
119+ List <Future <?>> futureList = new ArrayList <>(map .size ());
86120 for (TableDescriptor htd : map .values ()) {
87- for (ColumnFamilyDescriptor hcd : htd .getColumnFamilies ()) {
88- if (hcd .isMobEnabled () && hcd .getMinVersions () == 0 ) {
89- try {
90- cleaner .cleanExpiredMobFiles (htd , hcd );
91- } catch (IOException e ) {
92- LOG .error ("Failed to clean the expired mob files table={} family={}" ,
93- htd .getTableName ().getNameAsString (), hcd .getNameAsString (), e );
94- }
95- }
96- }
121+ Future <?> future = executor .submit (() -> handleOneTable (htd ));
122+ futureList .add (future );
123+ }
124+
125+ for (Future <?> future : futureList ) {
97126 try {
98- // Now clean obsolete files for a table
99- LOG .info ("Cleaning obsolete MOB files from table={}" , htd .getTableName ());
100- try (final Admin admin = master .getConnection ().getAdmin ()) {
101- MobFileCleanupUtil .cleanupObsoleteMobFiles (master .getConfiguration (), htd .getTableName (),
102- admin );
127+ future .get (cleanerFutureTimeout , TimeUnit .SECONDS );
128+ } catch (InterruptedException e ) {
129+ LOG .warn ("MobFileCleanerChore interrupted while waiting for futures" , e );
130+ Thread .currentThread ().interrupt ();
131+ cancelAllFutures (futureList );
132+ break ;
133+ } catch (ExecutionException e ) {
134+ LOG .error ("Exception during execution of MobFileCleanerChore task" , e );
135+ } catch (TimeoutException e ) {
136+ LOG .error ("MobFileCleanerChore timed out waiting for a task to complete" , e );
137+ }
138+ }
139+ }
140+
141+ private void cancelAllFutures (List <Future <?>> futureList ) {
142+ long pendingTaskCounter = 0 ;
143+ for (Future <?> f : futureList ) {
144+ if (!f .isDone ()) {
145+ f .cancel (true ); // interrupt running tasks
146+ pendingTaskCounter ++;
147+ }
148+ }
149+ LOG .info ("Cancelled {} pending mob file cleaner tasks" , pendingTaskCounter );
150+ }
151+
152+ private void handleOneTable (TableDescriptor htd ) {
153+ for (ColumnFamilyDescriptor hcd : htd .getColumnFamilies ()) {
154+ if (hcd .isMobEnabled () && hcd .getMinVersions () == 0 ) {
155+ try {
156+ cleaner .cleanExpiredMobFiles (htd , hcd );
157+ } catch (IOException e ) {
158+ LOG .error ("Failed to clean the expired mob files table={} family={}" ,
159+ htd .getTableName ().getNameAsString (), hcd .getNameAsString (), e );
103160 }
104- LOG .info ("Cleaning obsolete MOB files finished for table={}" , htd .getTableName ());
105- } catch (IOException e ) {
106- LOG .error ("Failed to clean the obsolete mob files for table={}" , htd .getTableName (), e );
107161 }
108162 }
163+ try {
164+ // Now clean obsolete files for a table
165+ LOG .info ("Cleaning obsolete MOB files from table={}" , htd .getTableName ());
166+ try (final Admin admin = master .getConnection ().getAdmin ()) {
167+ MobFileCleanupUtil .cleanupObsoleteMobFiles (master .getConfiguration (), htd .getTableName (),
168+ admin );
169+ }
170+ LOG .info ("Cleaning obsolete MOB files finished for table={}" , htd .getTableName ());
171+ } catch (IOException e ) {
172+ LOG .error ("Failed to clean the obsolete mob files for table={}" , htd .getTableName (), e );
173+ }
174+ }
175+
176+ @ Override
177+ public void onConfigurationChange (Configuration conf ) {
178+ int newThreadCount = conf .getInt (MobConstants .MOB_CLEANER_THREAD_COUNT ,
179+ MobConstants .DEFAULT_MOB_CLEANER_THREAD_COUNT );
180+ if (newThreadCount < 1 ) {
181+ return ; // invalid value , skip the config change
182+ }
183+
184+ if (newThreadCount != threadCount ) {
185+ resizeThreadPool (newThreadCount , newThreadCount );
186+ threadCount = newThreadCount ;
187+ }
109188 }
110189
190+ private void resizeThreadPool (int newCoreSize , int newMaxSize ) {
191+ int currentCoreSize = executor .getCorePoolSize ();
192+ if (newCoreSize > currentCoreSize ) {
193+ // Increasing the pool size: Set max first, then core
194+ executor .setMaximumPoolSize (newMaxSize );
195+ executor .setCorePoolSize (newCoreSize );
196+ } else {
197+ // Decreasing the pool size: Set core first, then max
198+ executor .setCorePoolSize (newCoreSize );
199+ executor .setMaximumPoolSize (newMaxSize );
200+ }
201+ }
202+
203+ @ RestrictedApi (explanation = "Should only be called in tests" , link = "" ,
204+ allowedOnPath = ".*/src/test/.*" )
205+ public ThreadPoolExecutor getExecutor () {
206+ return executor ;
207+ }
111208}
0 commit comments