Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public static boolean reprocess(OMMetadataManager omMetadataManager,

try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter =
new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable,
StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, perWorkerThreshold)) {
StringCodec.get(), maxIterators, perWorkerThreshold)) {
keyIter.performTaskOnTableVals(taskName, null, null, kvOperation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public static boolean reprocessBucketLayout(BucketLayout bucketLayout,

try (ParallelTableIteratorOperation<String, OmKeyInfo> keyIter =
new ParallelTableIteratorOperation<>(omMetadataManager, omKeyInfoTable,
StringCodec.get(), maxIterators, maxWorkers, maxKeysInMemory, perWorkerThreshold)) {
StringCodec.get(), maxIterators, perWorkerThreshold)) {
keyIter.performTaskOnTableVals(taskName, null, null, kvOperation);
} catch (Exception ex) {
LOG.error("Unable to populate File Size Count for {} in RocksDB.", taskName, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public class OmTableInsightTask implements ReconOmTask {
private Map<String, Long> objectCountMap;
private Map<String, Long> unReplicatedSizeMap;
private Map<String, Long> replicatedSizeMap;
private final int maxKeysInMemory;
private final int maxIterators;

@Inject
Expand All @@ -80,9 +79,6 @@ public OmTableInsightTask(ReconGlobalStatsManager reconGlobalStatsManager,
tableHandlers.put(OPEN_FILE_TABLE, new OpenKeysInsightHandler());
tableHandlers.put(DELETED_TABLE, new DeletedKeysInsightHandler());
tableHandlers.put(MULTIPART_INFO_TABLE, new MultipartInfoInsightHandler());
this.maxKeysInMemory = reconOMMetadataManager.getOzoneConfiguration().getInt(
ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY,
ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_KEYS_IN_MEMORY_DEFAULT);
this.maxIterators = reconOMMetadataManager.getOzoneConfiguration().getInt(
ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS,
ReconServerConfigKeys.OZONE_RECON_TASK_REPROCESS_MAX_ITERATORS_DEFAULT);
Expand Down Expand Up @@ -207,8 +203,8 @@ private void processTableInParallel(String tableName, OMMetadataManager omMetada

try (ParallelTableIteratorOperation<String, byte[]> parallelIter = new ParallelTableIteratorOperation<>(
omMetadataManager, table, StringCodec.get(),
maxIterators, workerCount, maxKeysInMemory, loggingThreshold)) {

maxIterators, loggingThreshold)) {
parallelIter.performTaskOnTableVals(getTaskName(), null, null, kv -> {
if (kv != null) {
count.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand All @@ -55,38 +53,27 @@ public class ParallelTableIteratorOperation<K extends Comparable<K>, V> implemen
private final Codec<K> keyCodec;

// Thread Pools
private final ExecutorService iteratorExecutor;
private final ExecutorService valueExecutors;

private final int maxNumberOfVals;
private final ExecutorService iteratorExecutor; // 5
private final long maxNumberOfVals;
private final OMMetadataManager metadataManager;
private final int maxIteratorTasks;
private final int maxWorkerTasks;
private final long logCountThreshold;

private static final Logger LOG = LoggerFactory.getLogger(ParallelTableIteratorOperation.class);

public ParallelTableIteratorOperation(OMMetadataManager metadataManager, Table<K, V> table, Codec<K> keyCodec,
int iteratorCount, int workerCount, int maxNumberOfValsInMemory,
long logThreshold) {
int iteratorCount, long logThreshold) {
this.table = table;
this.keyCodec = keyCodec;
this.metadataManager = metadataManager;
this.maxIteratorTasks = 2 * iteratorCount;
this.maxWorkerTasks = workerCount * 2;
this.maxIteratorTasks = 2 * iteratorCount; // Allow up to 10 pending iterator tasks

// Create team of iterator threads with UNLIMITED queue
// LinkedBlockingQueue() with no size = can hold infinite pending tasks
this.iteratorExecutor = new ThreadPoolExecutor(iteratorCount, iteratorCount, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>());

// Create team of worker threads with UNLIMITED queue
this.valueExecutors = new ThreadPoolExecutor(workerCount, workerCount, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>());

// Calculate batch size per worker
this.maxNumberOfVals = Math.max(10, maxNumberOfValsInMemory / (workerCount));
this.logCountThreshold = logThreshold;
this.maxNumberOfVals = Math.max(1000, this.logCountThreshold / (iteratorCount));
}

private List<K> getBounds(K startKey, K endKey) throws IOException {
Expand Down Expand Up @@ -166,9 +153,6 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey,
// Queue to track iterator threads
Queue<Future<?>> iterFutures = new LinkedList<>();

// Queue to track worker threads
Queue<Future<?>> workerFutures = new ConcurrentLinkedQueue<>();

AtomicLong keyCounter = new AtomicLong();
AtomicLong prevLogCounter = new AtomicLong();
Object logLock = new Object();
Expand All @@ -190,102 +174,66 @@ public void performTaskOnTableVals(String taskName, K startKey, K endKey,
iterFutures.add(iteratorExecutor.submit(() -> {
try (TableIterator<K, ? extends Table.KeyValue<K, V>> iter = table.iterator()) {
iter.seek(beg);
int count = 0;
while (iter.hasNext()) {
List<Table.KeyValue<K, V>> keyValues = new ArrayList<>();
boolean reachedEnd = false;
while (iter.hasNext()) {
Table.KeyValue<K, V> kv = iter.next();
K key = kv.getKey();

// Check if key is within this segment's range
boolean withinBounds;
if (inclusive) {
// Last segment: include everything from beg onwards (or until endKey if specified)
withinBounds = (endKey == null || key.compareTo(endKey) <= 0);
} else {
// Middle segment: include keys in range [beg, end)
withinBounds = key.compareTo(end) < 0;
}

if (withinBounds) {
keyValues.add(kv);
} else {
reachedEnd = true;
break;
}

// If batch is full, stop collecting
if (keyValues.size() >= maxNumberOfVals) {
break;
}
Table.KeyValue<K, V> kv = iter.next();
K key = kv.getKey();
// Check if key is within this segment's range
boolean withinBounds;
if (inclusive) {
// Last segment: include everything from beg onwards (or until endKey if specified)
withinBounds = (endKey == null || key.compareTo(endKey) <= 0);
} else {
// Middle segment: include keys in range [beg, end)
withinBounds = key.compareTo(end) < 0;
}

// ===== STEP 5: HAND BATCH TO WORKER THREAD =====
if (!keyValues.isEmpty()) {
// WAIT if worker queue is too full (max 39 pending tasks)
waitForQueueSize(workerFutures, maxWorkerTasks - 1);

// Submit batch to worker thread pool
workerFutures.add(valueExecutors.submit(() -> {
for (Table.KeyValue<K, V> kv : keyValues) {
keyOperation.apply(kv);
}
keyCounter.addAndGet(keyValues.size());
if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) {
synchronized (logLock) {
if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) {
long cnt = keyCounter.get();
LOG.debug("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName);
prevLogCounter.set(cnt);
}
if (!withinBounds) {
break;
}
// ===== STEP 4: APPLY OPERATION ON EACH KEY-VALUE =====
keyOperation.apply(kv);
// ===== STEP 5: EACH ITERATOR THREAD MAINTAINS ITS OWN LOCAL COUNT =====
// Each iterator thread counts how many keys it processed locally.
count++;
if (count % maxNumberOfVals == 0) {
// ===== STEP 6: UPDATE GLOBAL COUNT =====
keyCounter.addAndGet(count);
count = 0;
if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) {
synchronized (logLock) {
if (keyCounter.get() - prevLogCounter.get() > logCountThreshold) {
long cnt = keyCounter.get();
LOG.debug("Iterated through {} keys while performing task: {}", keyCounter.get(), taskName);
prevLogCounter.set(cnt);
}
}
// Worker task done! Future is now complete.
}));
}
// If we reached the end of our segment, stop reading
if (reachedEnd) {
break;
}
}
}
keyCounter.addAndGet(count);
} catch (IOException e) {
LOG.error("IO error during parallel iteration on table {}", taskName, e);
throw new RuntimeException("IO error during iteration", e);
} catch (InterruptedException e) {
LOG.warn("Parallel iteration interrupted for task {}", taskName, e);
Thread.currentThread().interrupt();
throw new RuntimeException("Iteration interrupted", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
LOG.error("Task execution failed for {}: {}", taskName, cause.getMessage(), cause);
throw new RuntimeException("Task execution failed", cause);
}
}));
}

// ===== STEP 7: WAIT FOR EVERYONE TO FINISH =====
// Wait for all iterator threads to finish reading
waitForQueueSize(iterFutures, 0);
// Wait for all worker threads to finish processing
waitForQueueSize(workerFutures, 0);

LOG.info("{}: Parallel iteration completed - Total keys processed: {}", taskName, keyCounter.get());
}

@Override
public void close() throws IOException {
iteratorExecutor.shutdown();
valueExecutors.shutdown();
try {
if (!iteratorExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
iteratorExecutor.shutdownNow();
}
if (!valueExecutors.awaitTermination(60, TimeUnit.SECONDS)) {
valueExecutors.shutdownNow();
}
} catch (InterruptedException e) {
iteratorExecutor.shutdownNow();
valueExecutors.shutdownNow();
Thread.currentThread().interrupt();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,29 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.utils.db.ByteArrayCodec;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.StringCodec;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TypedTable;
import org.apache.hadoop.hdds.utils.db.cache.TableCache;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
Expand Down Expand Up @@ -861,4 +867,69 @@ private OmKeyInfo getOmKeyInfo(String volumeName, String bucketName,
.setObjectID(objectID)
.build();
}

@Test
public void testParallelIteratorDoesNotCollectKeysInMemory()
throws Exception {
// Parallel processing is enabled only for string tables (tables with string keys).
OmTableInsightTask task =
new OmTableInsightTask(reconGlobalStatsManager, reconOMMetadataManager) {
@Override
public Collection<String> getTaskTables() {
return Collections.singletonList(KEY_TABLE);
}
};

OMMetadataManager omMetadataManager = mock(OMMetadataManager.class);
DBStore store = mock(DBStore.class);
when(omMetadataManager.getStore()).thenReturn(store);

@SuppressWarnings("unchecked")
Table<String, byte[]> mockTable =
(Table<String, byte[]>) mock(Table.class);

// Mock KeyValueIterator returned by iterator().
@SuppressWarnings("unchecked")
Table.KeyValueIterator<String, byte[]> kvIterator =
(Table.KeyValueIterator<String, byte[]>)
mock(Table.KeyValueIterator.class);

@SuppressWarnings("unchecked")
Table.KeyValue<String, byte[]> kv =
(Table.KeyValue<String, byte[]>) mock(Table.KeyValue.class);

when(kv.getKey()).thenReturn(
new String("/vol1/buck1/key-001".getBytes(StandardCharsets.UTF_8),
StandardCharsets.UTF_8));
when(kv.getValue()).thenReturn(new byte[] {'v'});

// Simulate KeyValueIterator with 5 entries.
when(kvIterator.hasNext())
.thenReturn(true, true, true, true, true, false);
when(kvIterator.next()).thenReturn(kv);

when(mockTable.iterator()).thenReturn(kvIterator);
when(mockTable.getEstimatedKeyCount()).thenReturn(5L);

when(store.getTable(
eq(KEY_TABLE),
eq(StringCodec.get()),
any(ByteArrayCodec.class),
eq(TableCache.CacheType.NO_CACHE)))
.thenReturn((Table) mockTable);

// Invoke reprocess (which triggers parallel processing for table with string keys).
ReconOmTask.TaskResult result = task.reprocess(omMetadataManager);
assertTrue(result.isTaskSuccess(),
"Parallel processing should succeed");

String countKey =
OmTableInsightTask.getTableCountKeyFromTable(KEY_TABLE);
Long count = task.initializeCountMap().get(countKey);
// Validate iterator count
assertEquals(5L, count,
"Parallel iterator must count all keys");
// Verify that the parallel processing uses table.iterator()
verify(mockTable).iterator();
}
}