1717 */
1818package org .apache .hadoop .hbase .quotas ;
1919
20- import static org .apache .hadoop .hbase .util .ConcurrentMapUtils .computeIfAbsent ;
21-
2220import java .io .IOException ;
2321import java .time .Duration ;
2422import java .util .ArrayList ;
3028import java .util .concurrent .ConcurrentHashMap ;
3129import java .util .concurrent .ConcurrentMap ;
3230import java .util .concurrent .TimeUnit ;
31+ import java .util .stream .Collectors ;
3332import org .apache .hadoop .conf .Configuration ;
3433import org .apache .hadoop .hbase .ClusterMetrics ;
3534import org .apache .hadoop .hbase .ClusterMetrics .Option ;
5655
5756/**
5857 * Cache that keeps track of the quota settings for the users and tables that are interacting with
59- * it. To avoid blocking the operations if the requested quota is not in cache an "empty quota" will
60- * be returned and the request to fetch the quota information will be enqueued for the next refresh.
61- * TODO: At the moment the Cache has a Chore that will be triggered every 5min or on cache-miss
62- * events. Later the Quotas will be pushed using the notification system.
58+ * it.
6359 */
6460@ InterfaceAudience .Private
6561@ InterfaceStability .Evolving
@@ -100,6 +96,62 @@ public class QuotaCache implements Stoppable {
10096 private QuotaRefresherChore refreshChore ;
10197 private boolean stopped = true ;
10298
99+ private final Fetcher <String , UserQuotaState > userQuotaStateFetcher =
100+ new Fetcher <String , UserQuotaState >() {
101+ @ Override
102+ public Get makeGet (final String user ) {
103+ final Set <String > namespaces = QuotaCache .this .namespaceQuotaCache .keySet ();
104+ final Set <TableName > tables = QuotaCache .this .tableQuotaCache .keySet ();
105+ return QuotaUtil .makeGetForUserQuotas (user , tables , namespaces );
106+ }
107+
108+ @ Override
109+ public Map <String , UserQuotaState > fetchEntries (final List <Get > gets ) throws IOException {
110+ return QuotaUtil .fetchUserQuotas (rsServices .getConnection (), gets , tableMachineQuotaFactors ,
111+ machineQuotaFactor );
112+ }
113+ };
114+
115+ private final Fetcher <String , QuotaState > regionServerQuotaStateFetcher =
116+ new Fetcher <String , QuotaState >() {
117+ @ Override
118+ public Get makeGet (final String regionServer ) {
119+ return QuotaUtil .makeGetForRegionServerQuotas (regionServer );
120+ }
121+
122+ @ Override
123+ public Map <String , QuotaState > fetchEntries (final List <Get > gets ) throws IOException {
124+ return QuotaUtil .fetchRegionServerQuotas (rsServices .getConnection (), gets );
125+ }
126+ };
127+
128+ private final Fetcher <TableName , QuotaState > tableQuotaStateFetcher =
129+ new Fetcher <TableName , QuotaState >() {
130+ @ Override
131+ public Get makeGet (final TableName table ) {
132+ return QuotaUtil .makeGetForTableQuotas (table );
133+ }
134+
135+ @ Override
136+ public Map <TableName , QuotaState > fetchEntries (final List <Get > gets ) throws IOException {
137+ return QuotaUtil .fetchTableQuotas (rsServices .getConnection (), gets ,
138+ tableMachineQuotaFactors );
139+ }
140+ };
141+
142+ private final Fetcher <String , QuotaState > namespaceQuotaStateFetcher =
143+ new Fetcher <String , QuotaState >() {
144+ @ Override
145+ public Get makeGet (final String namespace ) {
146+ return QuotaUtil .makeGetForNamespaceQuotas (namespace );
147+ }
148+
149+ @ Override
150+ public Map <String , QuotaState > fetchEntries (final List <Get > gets ) throws IOException {
151+ return QuotaUtil .fetchNamespaceQuotas (rsServices .getConnection (), gets , machineQuotaFactor );
152+ }
153+ };
154+
103155 public QuotaCache (final RegionServerServices rsServices ) {
104156 this .rsServices = rsServices ;
105157 this .userOverrideRequestAttributeKey =
@@ -153,8 +205,13 @@ public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableNa
153205 * @return the quota info associated to specified user
154206 */
155207 public UserQuotaState getUserQuotaState (final UserGroupInformation ugi ) {
156- return computeIfAbsent (userQuotaCache , getQuotaUserName (ugi ),
157- () -> QuotaUtil .buildDefaultUserQuotaState (rsServices .getConfiguration (), 0L ));
208+ String user = getQuotaUserName (ugi );
209+ if (!userQuotaCache .containsKey (user )) {
210+ userQuotaCache .put (user ,
211+ QuotaUtil .buildDefaultUserQuotaState (rsServices .getConfiguration (), 0L ));
212+ fetch ("user" , userQuotaCache , userQuotaStateFetcher );
213+ }
214+ return userQuotaCache .get (user );
158215 }
159216
160217 /**
@@ -163,7 +220,11 @@ public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
163220 * @return the limiter associated to the specified table
164221 */
165222 public QuotaLimiter getTableLimiter (final TableName table ) {
166- return getQuotaState (this .tableQuotaCache , table ).getGlobalLimiter ();
223+ if (!tableQuotaCache .containsKey (table )) {
224+ tableQuotaCache .put (table , new QuotaState ());
225+ fetch ("table" , tableQuotaCache , tableQuotaStateFetcher );
226+ }
227+ return tableQuotaCache .get (table ).getGlobalLimiter ();
167228 }
168229
169230 /**
@@ -172,7 +233,11 @@ public QuotaLimiter getTableLimiter(final TableName table) {
172233 * @return the limiter associated to the specified namespace
173234 */
174235 public QuotaLimiter getNamespaceLimiter (final String namespace ) {
175- return getQuotaState (this .namespaceQuotaCache , namespace ).getGlobalLimiter ();
236+ if (!namespaceQuotaCache .containsKey (namespace )) {
237+ namespaceQuotaCache .put (namespace , new QuotaState ());
238+ fetch ("namespace" , namespaceQuotaCache , namespaceQuotaStateFetcher );
239+ }
240+ return namespaceQuotaCache .get (namespace ).getGlobalLimiter ();
176241 }
177242
178243 /**
@@ -181,13 +246,41 @@ public QuotaLimiter getNamespaceLimiter(final String namespace) {
181246 * @return the limiter associated to the specified region server
182247 */
183248 public QuotaLimiter getRegionServerQuotaLimiter (final String regionServer ) {
184- return getQuotaState (this .regionServerQuotaCache , regionServer ).getGlobalLimiter ();
249+ if (!regionServerQuotaCache .containsKey (regionServer )) {
250+ regionServerQuotaCache .put (regionServer , new QuotaState ());
251+ fetch ("regionServer" , regionServerQuotaCache , regionServerQuotaStateFetcher );
252+ }
253+ return regionServerQuotaCache .get (regionServer ).getGlobalLimiter ();
185254 }
186255
187256 protected boolean isExceedThrottleQuotaEnabled () {
188257 return exceedThrottleQuotaEnabled ;
189258 }
190259
260+ private <K , V extends QuotaState > void fetch (final String type , final Map <K , V > quotasMap ,
261+ final Fetcher <K , V > fetcher ) {
262+ // Find the quota entries to update
263+ List <Get > gets = quotasMap .keySet ().stream ().map (fetcher ::makeGet ).collect (Collectors .toList ());
264+
265+ // fetch and update the quota entries
266+ if (!gets .isEmpty ()) {
267+ try {
268+ for (Map .Entry <K , V > entry : fetcher .fetchEntries (gets ).entrySet ()) {
269+ V quotaInfo = quotasMap .putIfAbsent (entry .getKey (), entry .getValue ());
270+ if (quotaInfo != null ) {
271+ quotaInfo .update (entry .getValue ());
272+ }
273+
274+ if (LOG .isTraceEnabled ()) {
275+ LOG .trace ("Loading {} key={} quotas={}" , type , entry .getKey (), quotaInfo );
276+ }
277+ }
278+ } catch (IOException e ) {
279+ LOG .warn ("Unable to read {} from quota table" , type , e );
280+ }
281+ }
282+ }
283+
191284 /**
192285 * Applies a request attribute user override if available, otherwise returns the UGI's short
193286 * username
@@ -210,14 +303,6 @@ private String getQuotaUserName(final UserGroupInformation ugi) {
210303 return Bytes .toString (override );
211304 }
212305
213- /**
214- * Returns the QuotaState requested. If the quota info is not in cache an empty one will be
215- * returned and the quota request will be enqueued for the next cache refresh.
216- */
217- private <K > QuotaState getQuotaState (final ConcurrentMap <K , QuotaState > quotasMap , final K key ) {
218- return computeIfAbsent (quotasMap , key , QuotaState ::new );
219- }
220-
221306 void triggerCacheRefresh () {
222307 refreshChore .triggerNow ();
223308 }
@@ -226,10 +311,6 @@ void forceSynchronousCacheRefresh() {
226311 refreshChore .chore ();
227312 }
228313
229- long getLastUpdate () {
230- return refreshChore .lastUpdate ;
231- }
232-
233314 Map <String , QuotaState > getNamespaceQuotaCache () {
234315 return namespaceQuotaCache ;
235316 }
@@ -248,8 +329,6 @@ Map<String, UserQuotaState> getUserQuotaCache() {
248329
249330 // TODO: Remove this once we have the notification bus
250331 private class QuotaRefresherChore extends ScheduledChore {
251- private long lastUpdate = 0 ;
252-
253332 // Querying cluster metrics so often, per-RegionServer, limits horizontal scalability.
254333 // So we cache the results to reduce that load.
255334 private final RefreshableExpiringValueCache <ClusterMetrics > tableRegionStatesClusterMetrics ;
@@ -307,74 +386,12 @@ protected void chore() {
307386 .computeIfAbsent (QuotaTableUtil .QUOTA_REGION_SERVER_ROW_KEY , key -> new QuotaState ());
308387
309388 updateQuotaFactors ();
310- fetchNamespaceQuotaState ();
311- fetchTableQuotaState ();
312- fetchUserQuotaState ();
313- fetchRegionServerQuotaState ();
389+ fetchAndEvict ("namespace" , QuotaCache .this .namespaceQuotaCache , namespaceQuotaStateFetcher );
390+ fetchAndEvict ("table" , QuotaCache .this .tableQuotaCache , tableQuotaStateFetcher );
391+ fetchAndEvict ("user" , QuotaCache .this .userQuotaCache , userQuotaStateFetcher );
392+ fetchAndEvict ("regionServer" , QuotaCache .this .regionServerQuotaCache ,
393+ regionServerQuotaStateFetcher );
314394 fetchExceedThrottleQuota ();
315- lastUpdate = EnvironmentEdgeManager .currentTime ();
316- }
317-
318- private void fetchNamespaceQuotaState () {
319- fetch ("namespace" , QuotaCache .this .namespaceQuotaCache , new Fetcher <String , QuotaState >() {
320- @ Override
321- public Get makeGet (final Map .Entry <String , QuotaState > entry ) {
322- return QuotaUtil .makeGetForNamespaceQuotas (entry .getKey ());
323- }
324-
325- @ Override
326- public Map <String , QuotaState > fetchEntries (final List <Get > gets ) throws IOException {
327- return QuotaUtil .fetchNamespaceQuotas (rsServices .getConnection (), gets ,
328- machineQuotaFactor );
329- }
330- });
331- }
332-
333- private void fetchTableQuotaState () {
334- fetch ("table" , QuotaCache .this .tableQuotaCache , new Fetcher <TableName , QuotaState >() {
335- @ Override
336- public Get makeGet (final Map .Entry <TableName , QuotaState > entry ) {
337- return QuotaUtil .makeGetForTableQuotas (entry .getKey ());
338- }
339-
340- @ Override
341- public Map <TableName , QuotaState > fetchEntries (final List <Get > gets ) throws IOException {
342- return QuotaUtil .fetchTableQuotas (rsServices .getConnection (), gets ,
343- tableMachineQuotaFactors );
344- }
345- });
346- }
347-
348- private void fetchUserQuotaState () {
349- final Set <String > namespaces = QuotaCache .this .namespaceQuotaCache .keySet ();
350- final Set <TableName > tables = QuotaCache .this .tableQuotaCache .keySet ();
351- fetch ("user" , QuotaCache .this .userQuotaCache , new Fetcher <String , UserQuotaState >() {
352- @ Override
353- public Get makeGet (final Map .Entry <String , UserQuotaState > entry ) {
354- return QuotaUtil .makeGetForUserQuotas (entry .getKey (), tables , namespaces );
355- }
356-
357- @ Override
358- public Map <String , UserQuotaState > fetchEntries (final List <Get > gets ) throws IOException {
359- return QuotaUtil .fetchUserQuotas (rsServices .getConnection (), gets ,
360- tableMachineQuotaFactors , machineQuotaFactor );
361- }
362- });
363- }
364-
365- private void fetchRegionServerQuotaState () {
366- fetch ("regionServer" , QuotaCache .this .regionServerQuotaCache ,
367- new Fetcher <String , QuotaState >() {
368- @ Override
369- public Get makeGet (final Map .Entry <String , QuotaState > entry ) {
370- return QuotaUtil .makeGetForRegionServerQuotas (entry .getKey ());
371- }
372-
373- @ Override
374- public Map <String , QuotaState > fetchEntries (final List <Get > gets ) throws IOException {
375- return QuotaUtil .fetchRegionServerQuotas (rsServices .getConnection (), gets );
376- }
377- });
378395 }
379396
380397 private void fetchExceedThrottleQuota () {
@@ -386,7 +403,7 @@ private void fetchExceedThrottleQuota() {
386403 }
387404 }
388405
389- private <K , V extends QuotaState > void fetch (final String type ,
406+ private <K , V extends QuotaState > void fetchAndEvict (final String type ,
390407 final ConcurrentMap <K , V > quotasMap , final Fetcher <K , V > fetcher ) {
391408 long now = EnvironmentEdgeManager .currentTime ();
392409 long evictPeriod = getPeriod () * EVICT_PERIOD_FACTOR ;
@@ -398,7 +415,7 @@ private <K, V extends QuotaState> void fetch(final String type,
398415 if (lastQuery > 0 && (now - lastQuery ) >= evictPeriod ) {
399416 toRemove .add (entry .getKey ());
400417 } else {
401- gets .add (fetcher .makeGet (entry ));
418+ gets .add (fetcher .makeGet (entry . getKey () ));
402419 }
403420 }
404421
@@ -543,8 +560,8 @@ static interface ThrowingSupplier<T> {
543560 T get () throws Exception ;
544561 }
545562
546- static interface Fetcher <Key , Value > {
547- Get makeGet (Map . Entry < Key , Value > entry );
563+ interface Fetcher <Key , Value > {
564+ Get makeGet (Key key );
548565
549566 Map <Key , Value > fetchEntries (List <Get > gets ) throws IOException ;
550567 }
0 commit comments