diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java index 3e60ceceb6ba..2dda364e845f 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ContainerKeyMapperHelper.java @@ -167,7 +167,7 @@ public static boolean reprocess(OMMetadataManager omMetadataManager, try (ParallelTableIteratorOperation keyIter = new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable, - StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, perWorkerThreshold)) { + StringCodec.get(), maxIterators, perWorkerThreshold)) { keyIter.performTaskOnTableVals(taskName, null, null, kvOperation); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java index 4981083b5025..62c1d54c8ece 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/FileSizeCountTaskHelper.java @@ -182,7 +182,7 @@ public static boolean reprocessBucketLayout(BucketLayout bucketLayout, try (ParallelTableIteratorOperation keyIter = new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable, - StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, perWorkerThreshold)) { + StringCodec.get(), maxIterators, perWorkerThreshold)) { keyIter.performTaskOnTableVals(taskName, null, null, kvOperation); } catch (Exception ex) { LOG.error("Unable to populate File Size Count for {} in RocksDB.", taskName, ex); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java index a4853dbf31da..e560fc96e549 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/OmTableInsightTask.java @@ -65,7 +65,6 @@ public class OmTableInsightTask implements ReconOmTask { private Map objectCountMap; private Map unReplicatedSizeMap; private Map replicatedSizeMap; - private final int maxKeysInMemory; private final int maxIterators; @Inject @@ -80,9 +79,6 @@ public OmTableInsightTask(ReconGlobalStatsManager reconGlobalStatsManager, tableHandlers.put(OPEN_FILE_TABLE, new OpenKeysInsightHandler()); tableHandlers.put(DELETED_TABLE, new DeletedKeysInsightHandler()); tableHandlers.put(MULTIPART_INFO_TABLE, new MultipartInfoInsightHandler()); - this.maxKeysInMemory = reconOMMetadataManager.getOzoneConfiguration().getInt( - ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY, - ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT); this.maxIterators = reconOMMetadataManager.getOzoneConfiguration().getInt( ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS, ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS_DEFAULT); @@ -207,8 +203,8 @@ private void processTableInParallel(String tableName, OMMetadataManager omMetada try (ParallelTableIteratorOperation parallelIter = new ParallelTableIteratorOperation<>( omMetadataManager, table, StringCodec.get(), - maxIterators, workerCount, maxKeysInMemory, loggingThreshold)) { - + maxIterators, loggingThreshold)) { + parallelIter.performTaskOnTableVals(getTaskName(), null, null, kv -> { if (kv != null) { count.incrementAndGet(); diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java index 3c378c36cd9f..b49d4c0de34d 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/util/ParallelTableIteratorOperation.java @@ -20,7 +20,6 @@ import java.io.Closeable; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.LinkedList; @@ -28,7 +27,6 @@ import java.util.Objects; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -55,38 +53,27 @@ public class ParallelTableIteratorOperation, V> implemen private final Codec keyCodec; // Thread Pools - private final ExecutorService iteratorExecutor; - private final ExecutorService valueExecutors; - - private final int maxNumberOfVals; + private final ExecutorService iteratorExecutor; // 5 + private final long maxNumberOfVals; private final OMMetadataManager metadataManager; private final int maxIteratorTasks; - private final int maxWorkerTasks; private final long logCountThreshold; private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class); public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table table, Codec keyCodec, - int iteratorCount, int workerCount, int maxNumberOfValsInMemory, - long logThreshold) { + int iteratorCount, long logThreshold) { this.table = table; this.keyCodec = keyCodec; this.metadataManager = metadataManager; - this.maxIteratorTasks = 2 * iteratorCount; - this.maxWorkerTasks = workerCount * 2; + this.maxIteratorTasks = 2 * iteratorCount; // Allow up to 10 pending iterator tasks // Create team of iterator threads with UNLIMITED queue // LinkedBlockingQueue() with no size = can hold infinite pending tasks this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>()); - - // Create team of worker threads with UNLIMITED queue - this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES, - new LinkedBlockingQueue<>()); - - // Calculate batch size per worker - this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount)); this.logCountThreshold = logThreshold; + this.maxNumberOfVals = Math.max(1000, this.logCountThreshold / (iteratorCount)); } private List getBounds(K startKey, K endKey) throws IOException { @@ -166,9 +153,6 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey, // Queue to track iterator threads Queue> iterFutures = new LinkedList<>(); - // Queue to track worker threads - Queue> workerFutures = new ConcurrentLinkedQueue<>(); - AtomicLong keyCounter = new AtomicLong(); AtomicLong prevLogCounter = new AtomicLong(); Object logLock = new Object(); @@ -190,75 +174,46 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey, iterFutures.add(iteratorExecutor.submit(() -> { try (TableIterator> iter = table.iterator()) { iter.seek(beg); + int count = 0; while (iter.hasNext()) { - List> keyValues = new ArrayList<>(); - boolean reachedEnd = false; - while (iter.hasNext()) { - Table.KeyValue kv = iter.next(); - K key = kv.getKey(); - - // Check if key is within this segment's range - boolean withinBounds; - if (inclusive) { - // Last segment: include everything from beg onwards (or until endKey if specified) - withinBounds = (endKey == null || key.compareTo(endKey) <= 0); - } else { - // Middle segment: include keys in range [beg, end) - withinBounds = key.compareTo(end) < 0; - } - - if (withinBounds) { - keyValues.add(kv); - } else { - reachedEnd = true; - break; - } - - // If batch is full, stop collecting - if (keyValues.size() >= maxNumberOfVals) { - break; - } + Table.KeyValue kv = iter.next(); + K key = kv.getKey(); + // Check if key is within this segment's range + boolean withinBounds; + if (inclusive) { + // Last segment: include everything from beg onwards (or until endKey if specified) + withinBounds = (endKey == null || key.compareTo(endKey) <= 0); + } else { + // Middle segment: include keys in range [beg, end) + withinBounds = key.compareTo(end) < 0; } - - // ===== STEP 5: HAND BATCH TO WORKER THREAD ===== - if (!keyValues.isEmpty()) { - // WAIT if worker queue is too full (max 39 pending tasks) - waitForQueueSize(workerFutures, maxWorkerTasks - 1); - - // Submit batch to worker thread pool - workerFutures.add(valueExecutors.submit(() -> { - for (Table.KeyValue kv : keyValues) { - keyOperation.apply(kv); - } - keyCounter.addAndGet(keyValues.size()); - if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { - synchronized (logLock) { - if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { - long cnt = keyCounter.get(); - LOG.debug("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName); - prevLogCounter.set(cnt); - } + if (!withinBounds) { + break; + } + // ===== STEP 4: APPLY OPERATION ON EACH KEY-VALUE ===== + keyOperation.apply(kv); + // ===== STEP 5: EACH ITERATOR THREAD MAINTAINS ITS OWN LOCAL COUNT ===== + // Each iterator thread counts how many keys it processed locally. + count++; + if (count % maxNumberOfVals == 0) { + // ===== STEP 6: UPDATE GLOBAL COUNT ===== + keyCounter.addAndGet(count); + count = 0; + if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { + synchronized (logLock) { + if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) { + long cnt = keyCounter.get(); + LOG.debug("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName); + prevLogCounter.set(cnt); } } - // Worker task done! Future is now complete. - })); - } - // If we reached the end of our segment, stop reading - if (reachedEnd) { - break; + } } } + keyCounter.addAndGet(count); } catch (IOException e) { LOG.error("IO error during parallel iteration on table {}", taskName, e); throw new RuntimeException("IO error during iteration", e); - } catch (InterruptedException e) { - LOG.warn("Parallel iteration interrupted for task {}", taskName, e); - Thread.currentThread().interrupt(); - throw new RuntimeException("Iteration interrupted", e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - LOG.error("Task execution failed for {}: {}", taskName, cause.getMessage(), cause); - throw new RuntimeException("Task execution failed", cause); } })); } @@ -266,8 +221,6 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey, // ===== STEP 7: WAIT FOR EVERYONE TO FINISH ===== // Wait for all iterator threads to finish reading waitForQueueSize(iterFutures, 0); - // Wait for all worker threads to finish processing - waitForQueueSize(workerFutures, 0); LOG.info("{}: Parallel iteration completed - Total keys processed: {}", taskName, keyCounter.get()); } @@ -275,17 +228,12 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey, @Override public void close() throws IOException { iteratorExecutor.shutdown(); - valueExecutors.shutdown(); try { if (!iteratorExecutor.awaitTermination(60, TimeUnit.SECONDS)) { iteratorExecutor.shutdownNow(); } - if (!valueExecutors.awaitTermination(60, TimeUnit.SECONDS)) { - valueExecutors.shutdownNow(); - } } catch (InterruptedException e) { iteratorExecutor.shutdownNow(); - valueExecutors.shutdownNow(); Thread.currentThread().interrupt(); } } diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java index 92ee52652107..e78c9a3847cd 100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/tasks/TestOmTableInsightTask.java @@ -41,13 +41,16 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -55,9 +58,12 @@ import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.utils.db.ByteArrayCodec; import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.StringCodec; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.hdds.utils.db.TypedTable; +import org.apache.hadoop.hdds.utils.db.cache.TableCache; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.helpers.BucketLayout; @@ -861,4 +867,69 @@ private OmKeyInfo getOmKeyInfo(String volumeName, String bucketName, .setObjectID(objectID) .build(); } + + @Test + public void testParallelIteratorDoesNotCollectKeysInMemory() + throws Exception { + // Parallel processing is enabled only for string tables (tables with string keys). + OmTableInsightTask task = + new OmTableInsightTask(reconGlobalStatsManager, reconOMMetadataManager) { + @Override + public Collection getTaskTables() { + return Collections.singletonList(KEY_TABLE); + } + }; + + OMMetadataManager omMetadataManager = mock(OMMetadataManager.class); + DBStore store = mock(DBStore.class); + when(omMetadataManager.getStore()).thenReturn(store); + + @SuppressWarnings("unchecked") + Table mockTable = + (Table) mock(Table.class); + + // Mock KeyValueIterator returned by iterator(). + @SuppressWarnings("unchecked") + Table.KeyValueIterator kvIterator = + (Table.KeyValueIterator) + mock(Table.KeyValueIterator.class); + + @SuppressWarnings("unchecked") + Table.KeyValue kv = + (Table.KeyValue) mock(Table.KeyValue.class); + + when(kv.getKey()).thenReturn( + new String("/vol1/buck1/key-001".getBytes(StandardCharsets.UTF_8), + StandardCharsets.UTF_8)); + when(kv.getValue()).thenReturn(new byte[] {'v'}); + + // Simulate KeyValueIterator with 5 entries. + when(kvIterator.hasNext()) + .thenReturn(true, true, true, true, true, false); + when(kvIterator.next()).thenReturn(kv); + + when(mockTable.iterator()).thenReturn(kvIterator); + when(mockTable.getEstimatedKeyCount()).thenReturn(5L); + + when(store.getTable( + eq(KEY_TABLE), + eq(StringCodec.get()), + any(ByteArrayCodec.class), + eq(TableCache.CacheType.NO_CACHE))) + .thenReturn((Table) mockTable); + + // Invoke reprocess (which triggers parallel processing for table with string keys). + ReconOmTask.TaskResult result = task.reprocess(omMetadataManager); + assertTrue(result.isTaskSuccess(), + "Parallel processing should succeed"); + + String countKey = + OmTableInsightTask.getTableCountKeyFromTable(KEY_TABLE); + Long count = task.initializeCountMap().get(countKey); + // Validate iterator count + assertEquals(5L, count, + "Parallel iterator must count all keys"); + // Verify that the parallel processing uses table.iterator() + verify(mockTable).iterator(); + } }