1717 */
1818package org .apache .hadoop .hbase .mob ;
1919
20+ import static org .apache .hadoop .hbase .mob .MobConstants .DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT ;
21+ import static org .apache .hadoop .hbase .mob .MobConstants .MOB_FILE_CLEANER_CHORE_TIME_OUT ;
22+
23+ import com .google .errorprone .annotations .RestrictedApi ;
2024import java .io .IOException ;
25+ import java .util .ArrayList ;
26+ import java .util .List ;
2127import java .util .Map ;
28+ import java .util .concurrent .ExecutionException ;
29+ import java .util .concurrent .Future ;
30+ import java .util .concurrent .LinkedBlockingQueue ;
31+ import java .util .concurrent .ThreadFactory ;
32+ import java .util .concurrent .ThreadPoolExecutor ;
2233import java .util .concurrent .TimeUnit ;
34+ import java .util .concurrent .TimeoutException ;
2335import org .apache .hadoop .conf .Configuration ;
2436import org .apache .hadoop .hbase .ScheduledChore ;
2537import org .apache .hadoop .hbase .TableDescriptors ;
2638import org .apache .hadoop .hbase .client .Admin ;
2739import org .apache .hadoop .hbase .client .ColumnFamilyDescriptor ;
2840import org .apache .hadoop .hbase .client .TableDescriptor ;
41+ import org .apache .hadoop .hbase .conf .ConfigurationObserver ;
2942import org .apache .hadoop .hbase .master .HMaster ;
3043import org .apache .yetus .audience .InterfaceAudience ;
3144import org .slf4j .Logger ;
3245import org .slf4j .LoggerFactory ;
3346
47+ import org .apache .hbase .thirdparty .com .google .common .util .concurrent .ThreadFactoryBuilder ;
48+
3449/**
3550 * The class MobFileCleanerChore for running cleaner regularly to remove the expired and obsolete
3651 * (files which have no active references to) mob files.
3752 */
3853@ InterfaceAudience .Private
39- public class MobFileCleanerChore extends ScheduledChore {
54+ public class MobFileCleanerChore extends ScheduledChore implements ConfigurationObserver {
4055
4156 private static final Logger LOG = LoggerFactory .getLogger (MobFileCleanerChore .class );
57+
4258 private final HMaster master ;
43- private ExpiredMobFileCleaner cleaner ;
59+ private final ExpiredMobFileCleaner cleaner ;
60+ private final ThreadPoolExecutor executor ;
61+ private final int cleanerFutureTimeout ;
62+ private int threadCount ;
4463
4564 static {
4665 Configuration .addDeprecation (MobConstants .DEPRECATED_MOB_CLEANER_PERIOD ,
@@ -57,7 +76,21 @@ public MobFileCleanerChore(HMaster master) {
5776 this .master = master ;
5877 cleaner = new ExpiredMobFileCleaner ();
5978 cleaner .setConf (master .getConfiguration ());
79+ threadCount = master .getConfiguration ().getInt (MobConstants .MOB_CLEANER_THREAD_COUNT ,
80+ MobConstants .DEFAULT_MOB_CLEANER_THREAD_COUNT );
81+ if (threadCount < 1 ) {
82+ threadCount = 1 ;
83+ }
84+
85+ ThreadFactory threadFactory =
86+ new ThreadFactoryBuilder ().setDaemon (true ).setNameFormat ("mobfile-cleaner-pool-%d" ).build ();
87+
88+ executor = new ThreadPoolExecutor (threadCount , threadCount , 60 , TimeUnit .SECONDS ,
89+ new LinkedBlockingQueue <Runnable >(), threadFactory );
90+
6091 checkObsoleteConfigurations ();
92+ cleanerFutureTimeout = master .getConfiguration ().getInt (MOB_FILE_CLEANER_CHORE_TIME_OUT ,
93+ DEFAULT_MOB_FILE_CLEANER_CHORE_TIME_OUT );
6194 }
6295
6396 private void checkObsoleteConfigurations () {
@@ -88,29 +121,93 @@ protected void chore() {
88121 LOG .error ("MobFileCleanerChore failed" , e );
89122 return ;
90123 }
124+ List <Future <?>> futureList = new ArrayList <>(map .size ());
91125 for (TableDescriptor htd : map .values ()) {
92- for (ColumnFamilyDescriptor hcd : htd .getColumnFamilies ()) {
93- if (hcd .isMobEnabled () && hcd .getMinVersions () == 0 ) {
94- try {
95- cleaner .cleanExpiredMobFiles (htd , hcd );
96- } catch (IOException e ) {
97- LOG .error ("Failed to clean the expired mob files table={} family={}" ,
98- htd .getTableName ().getNameAsString (), hcd .getNameAsString (), e );
99- }
100- }
101- }
126+ Future <?> future = executor .submit (() -> handleOneTable (htd ));
127+ futureList .add (future );
128+ }
129+
130+ for (Future <?> future : futureList ) {
102131 try {
103- // Now clean obsolete files for a table
104- LOG .info ("Cleaning obsolete MOB files from table={}" , htd .getTableName ());
105- try (final Admin admin = master .getConnection ().getAdmin ()) {
106- MobFileCleanupUtil .cleanupObsoleteMobFiles (master .getConfiguration (), htd .getTableName (),
107- admin );
132+ future .get (cleanerFutureTimeout , TimeUnit .SECONDS );
133+ } catch (InterruptedException e ) {
134+ LOG .warn ("MobFileCleanerChore interrupted while waiting for futures" , e );
135+ Thread .currentThread ().interrupt ();
136+ cancelAllFutures (futureList );
137+ break ;
138+ } catch (ExecutionException e ) {
139+ LOG .error ("Exception during execution of MobFileCleanerChore task" , e );
140+ } catch (TimeoutException e ) {
141+ LOG .error ("MobFileCleanerChore timed out waiting for a task to complete" , e );
142+ }
143+ }
144+ }
145+
146+ private void cancelAllFutures (List <Future <?>> futureList ) {
147+ long pendingTaskCounter = 0 ;
148+ for (Future <?> f : futureList ) {
149+ if (!f .isDone ()) {
150+ f .cancel (true ); // interrupt running tasks
151+ pendingTaskCounter ++;
152+ }
153+ }
154+ LOG .info ("Cancelled {} pending mob file cleaner tasks" , pendingTaskCounter );
155+ }
156+
157+ private void handleOneTable (TableDescriptor htd ) {
158+ for (ColumnFamilyDescriptor hcd : htd .getColumnFamilies ()) {
159+ if (hcd .isMobEnabled () && hcd .getMinVersions () == 0 ) {
160+ try {
161+ cleaner .cleanExpiredMobFiles (htd , hcd );
162+ } catch (IOException e ) {
163+ LOG .error ("Failed to clean the expired mob files table={} family={}" ,
164+ htd .getTableName ().getNameAsString (), hcd .getNameAsString (), e );
108165 }
109- LOG .info ("Cleaning obsolete MOB files finished for table={}" , htd .getTableName ());
110- } catch (IOException e ) {
111- LOG .error ("Failed to clean the obsolete mob files for table={}" , htd .getTableName (), e );
112166 }
113167 }
168+ try {
169+ // Now clean obsolete files for a table
170+ LOG .info ("Cleaning obsolete MOB files from table={}" , htd .getTableName ());
171+ try (final Admin admin = master .getConnection ().getAdmin ()) {
172+ MobFileCleanupUtil .cleanupObsoleteMobFiles (master .getConfiguration (), htd .getTableName (),
173+ admin );
174+ }
175+ LOG .info ("Cleaning obsolete MOB files finished for table={}" , htd .getTableName ());
176+ } catch (IOException e ) {
177+ LOG .error ("Failed to clean the obsolete mob files for table={}" , htd .getTableName (), e );
178+ }
179+ }
180+
181+ @ Override
182+ public void onConfigurationChange (Configuration conf ) {
183+ int newThreadCount = conf .getInt (MobConstants .MOB_CLEANER_THREAD_COUNT ,
184+ MobConstants .DEFAULT_MOB_CLEANER_THREAD_COUNT );
185+ if (newThreadCount < 1 ) {
186+ return ; // invalid value , skip the config change
187+ }
188+
189+ if (newThreadCount != threadCount ) {
190+ resizeThreadPool (newThreadCount , newThreadCount );
191+ threadCount = newThreadCount ;
192+ }
114193 }
115194
195+ private void resizeThreadPool (int newCoreSize , int newMaxSize ) {
196+ int currentCoreSize = executor .getCorePoolSize ();
197+ if (newCoreSize > currentCoreSize ) {
198+ // Increasing the pool size: Set max first, then core
199+ executor .setMaximumPoolSize (newMaxSize );
200+ executor .setCorePoolSize (newCoreSize );
201+ } else {
202+ // Decreasing the pool size: Set core first, then max
203+ executor .setCorePoolSize (newCoreSize );
204+ executor .setMaximumPoolSize (newMaxSize );
205+ }
206+ }
207+
208+ @ RestrictedApi (explanation = "Should only be called in tests" , link = "" ,
209+ allowedOnPath = ".*/src/test/.*" )
210+ public ThreadPoolExecutor getExecutor () {
211+ return executor ;
212+ }
116213}
0 commit comments