From 2ed4724ce40fa4ac286d6ef358d3a9fa55299de5 Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Sat, 18 Oct 2025 07:25:48 +0200 Subject: [PATCH 1/4] Fix deadlock --- .../remote/merkletree/MerkleTreeComputer.java | 290 +++++++++--------- 1 file changed, 147 insertions(+), 143 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java index 8c2f7315e64633..d4dbeb4d246bf1 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java @@ -24,7 +24,6 @@ import static java.util.Map.entry; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; -import static java.util.concurrent.CompletableFuture.supplyAsync; import build.bazel.remote.execution.v2.Digest; import build.bazel.remote.execution.v2.Directory; @@ -33,6 +32,7 @@ import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -40,6 +40,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputHelper; import com.google.devtools.build.lib.actions.Artifact; @@ -72,9 +73,9 @@ import java.io.IOException; import java.util.AbstractCollection; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Collection; import java.util.Deque; +import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Objects; @@ -297,18 +298,20 @@ public MerkleTree buildForSpawn( path -> toolInputs.contains(path.relativeTo(remotePathResolver.getWorkingDirectory())); } try { - return build( - Lists.transform( - allInputs, - input -> - entry(getOutputPath(input, remotePathResolver, spawn.getPathMapper()), input)), - isToolInput, - scrubber != null ? scrubber.forSpawn(spawn) : null, - spawnExecutionContext.getInputMetadataProvider(), - spawnExecutionContext.getPathResolver(), - remoteActionExecutionContext, - remotePathResolver, - blobPolicy); + return getFromFuture( + build( + Lists.transform( + allInputs, + input -> + entry( + getOutputPath(input, remotePathResolver, spawn.getPathMapper()), input)), + isToolInput, + scrubber != null ? scrubber.forSpawn(spawn) : null, + spawnExecutionContext.getInputMetadataProvider(), + spawnExecutionContext.getPathResolver(), + remoteActionExecutionContext, + remotePathResolver, + blobPolicy)); } catch (BulkTransferException e) { e.getLostArtifacts(spawnExecutionContext.getInputMetadataProvider()::getInput) .throwIfNotEmpty(); @@ -370,21 +373,22 @@ public MerkleTree.Uploadable buildForFiles(Map inputs) throws IOException, InterruptedException { // BlobPolicy.KEEP_AND_REUPLOAD always results in a MerkleTree.Uploadable. return (MerkleTree.Uploadable) - build( - Lists.transform( - ImmutableList.sortedCopyOf( - Map.Entry.comparingByKey(HIERARCHICAL_COMPARATOR), inputs.entrySet()), - e -> entry(e.getKey(), new ActionInputWithPath(e.getValue()))), - alwaysFalse(), - /* spawnScrubber= */ null, - StaticInputMetadataProvider.empty(), - actionInputWithPathResolver, - /* remoteActionExecutionContext= */ null, - /* remotePathResolver= */ null, - BlobPolicy.KEEP_AND_REUPLOAD); + getFromFuture( + build( + Lists.transform( + ImmutableList.sortedCopyOf( + Map.Entry.comparingByKey(HIERARCHICAL_COMPARATOR), inputs.entrySet()), + e -> entry(e.getKey(), new ActionInputWithPath(e.getValue()))), + alwaysFalse(), + /* spawnScrubber= */ null, + StaticInputMetadataProvider.empty(), + actionInputWithPathResolver, + /* remoteActionExecutionContext= */ null, + /* remotePathResolver= */ null, + BlobPolicy.KEEP_AND_REUPLOAD)); } - private MerkleTree build( + private CompletableFuture build( Collection> sortedInputs, Predicate isToolInput, @Nullable SpawnScrubber spawnScrubber, @@ -393,22 +397,49 @@ private MerkleTree build( @Nullable RemoteActionExecutionContext remoteActionExecutionContext, @Nullable RemotePathResolver remotePathResolver, BlobPolicy blobPolicy) + throws IOException { + return precomputeSubTrees( + sortedInputs, + isToolInput, + metadataProvider, + artifactPathResolver, + remoteActionExecutionContext, + remotePathResolver, + blobPolicy) + .thenApplyAsync( + subTreeRoots -> { + try { + return buildWithPrecomputedSubTrees( + subTreeRoots, + sortedInputs, + isToolInput, + spawnScrubber, + metadataProvider, + artifactPathResolver, + blobPolicy); + } catch (IOException e) { + throw new WrappedException(e); + } catch (InterruptedException e) { + throw new WrappedException(e); + } + }, + MERKLE_TREE_BUILD_POOL); + } + + private MerkleTree buildWithPrecomputedSubTrees( + ImmutableMap, MerkleTree.RootOnly> + subTreeRoots, + Collection> sortedInputs, + Predicate isToolInput, + @Nullable SpawnScrubber spawnScrubber, + InputMetadataProvider metadataProvider, + ArtifactPathResolver artifactPathResolver, + BlobPolicy blobPolicy) throws IOException, InterruptedException { if (sortedInputs.isEmpty()) { return emptyTree; } - var unused = - getFromFuture( - cacheSubTrees( - sortedInputs, - isToolInput, - metadataProvider, - artifactPathResolver, - remoteActionExecutionContext, - remotePathResolver, - blobPolicy)); - long inputFiles = 0; long inputBytes = 0; var blobs = ImmutableMap.builder(); @@ -484,39 +515,15 @@ private MerkleTree build( var nodeProperties = isToolInput.test(path) ? TOOL_NODE_PROPERTIES : null; switch (input) { - case Artifact treeArtifact when treeArtifact.isTreeArtifact() -> { + case Artifact.SpecialArtifact specialArtifact + when specialArtifact.isTreeArtifact() || specialArtifact.isRunfilesTree() -> { var subTreeRoot = - getFromFuture( - computeForTreeArtifactIfAbsent( - metadataProvider.getTreeMetadata(treeArtifact), - path, - isToolInput, - metadataProvider, - artifactPathResolver, - remoteActionExecutionContext, - remotePathResolver, - blobPolicy)); + Preconditions.checkNotNull(subTreeRoots.get(entry), "missing subtree for %s", input); currentDirectory.addDirectoriesBuilder().setName(name).setDigest(subTreeRoot.digest()); inputFiles += subTreeRoot.inputFiles(); inputBytes += subTreeRoot.inputBytes(); } - case Artifact runfilesArtifact when runfilesArtifact.isRunfilesTree() -> { - var subTreeRoot = - getFromFuture( - computeForRunfilesTreeIfAbsent( - metadataProvider.getRunfilesMetadata(runfilesArtifact), - path, - isToolInput, - metadataProvider, - artifactPathResolver, - remoteActionExecutionContext, - remotePathResolver, - blobPolicy)); - currentDirectory.addDirectoriesBuilder().setName(name).setDigest(subTreeRoot.digest()); - inputFiles += subTreeRoot.inputFiles(); - inputBytes += subTreeRoot.inputBytes(); - } - case Artifact symlink when symlink.isSymlink() -> { + case Artifact.SpecialArtifact symlink when symlink.isSymlink() -> { Path symlinkPath = artifactPathResolver.toPath(symlink); var builder = currentDirectory @@ -536,18 +543,8 @@ private MerkleTree build( fileOrSourceDirectory); if (metadata.getType() == FileStateType.DIRECTORY) { var subTreeRoot = - getFromFuture( - computeIfAbsent( - metadata, - () -> - explodeDirectory(artifactPathResolver.toPath(fileOrSourceDirectory)) - .entrySet(), - isToolInput.test(path), - metadataProvider, - artifactPathResolver, - remoteActionExecutionContext, - remotePathResolver, - blobPolicy)); + Preconditions.checkNotNull( + subTreeRoots.get(entry), "missing subtree for %s", input); currentDirectory.addDirectoriesBuilder().setName(name).setDigest(subTreeRoot.digest()); inputFiles += subTreeRoot.inputFiles(); inputBytes += subTreeRoot.inputBytes(); @@ -617,16 +614,22 @@ private MerkleTree build( throw new IllegalStateException("not reached"); } - private CompletableFuture cacheSubTrees( - Collection> sortedInputs, - Predicate isToolInput, - InputMetadataProvider metadataProvider, - ArtifactPathResolver artifactPathResolver, - RemoteActionExecutionContext remoteActionExecutionContext, - RemotePathResolver remotePathResolver, - BlobPolicy blobPolicy) - throws IOException { - ArrayList> subTreeFutures = new ArrayList<>(); + private CompletableFuture< + ImmutableMap< + ? extends Map.Entry, MerkleTree.RootOnly>> + precomputeSubTrees( + Collection> sortedInputs, + Predicate isToolInput, + InputMetadataProvider metadataProvider, + ArtifactPathResolver artifactPathResolver, + RemoteActionExecutionContext remoteActionExecutionContext, + RemotePathResolver remotePathResolver, + BlobPolicy blobPolicy) + throws IOException { + var subTreeFutures = + new HashMap< + Map.Entry, + CompletableFuture>(); for (var entry : sortedInputs) { var future = maybeCacheSubtree( @@ -639,14 +642,17 @@ private CompletableFuture cacheSubTrees( remotePathResolver, blobPolicy); if (future != null) { - subTreeFutures.add(future); + subTreeFutures.put(entry, future); } } - return allOf(subTreeFutures.toArray(CompletableFuture[]::new)); + return allOf(subTreeFutures.values().toArray(CompletableFuture[]::new)) + .thenApply( + unused -> + ImmutableMap.copyOf(Maps.transformValues(subTreeFutures, CompletableFuture::join))); } @Nullable - private CompletableFuture maybeCacheSubtree( + private CompletableFuture maybeCacheSubtree( @Nullable ActionInput input, PathFragment mappedExecPath, Predicate isToolInput, @@ -825,60 +831,58 @@ private CompletableFuture computeIfAbsent( return inFlightComputation; } } - return supplyAsync( - () -> { - try { - // Subtrees either consist entirely of tool inputs or don't contain any. - // The same applies to scrubbed inputs. - return build( - sortedInputsSupplier.compute(), - isTool ? alwaysTrue() : alwaysFalse(), - /* spawnScrubber= */ null, - metadataProvider, - artifactPathResolver, + CompletableFuture merkleTreeFuture; + try { + // Subtrees either consist entirely of tool inputs or don't contain any. + // The same applies to scrubbed inputs. + merkleTreeFuture = + build( + sortedInputsSupplier.compute(), + isTool ? alwaysTrue() : alwaysFalse(), + /* spawnScrubber= */ null, + metadataProvider, + artifactPathResolver, + remoteActionExecutionContext, + remotePathResolver, + blobPolicy); + } catch (IOException e) { + throw new WrappedException(e); + } catch (InterruptedException e) { + throw new WrappedException(e); + } + return merkleTreeFuture.thenApplyAsync( + merkleTree -> { + if (merkleTree instanceof MerkleTree.Uploadable uploadable) { + try { + if (merkleTreeUploader != null) { + merkleTreeUploader.ensureInputsPresent( remoteActionExecutionContext, - remotePathResolver, - blobPolicy); - } catch (IOException e) { - throw new WrappedException(e); - } catch (InterruptedException e) { - throw new WrappedException(e); - } - }, - MERKLE_TREE_BUILD_POOL) - .thenApplyAsync( - merkleTree -> { - if (merkleTree instanceof MerkleTree.Uploadable uploadable) { - try { - if (merkleTreeUploader != null) { - merkleTreeUploader.ensureInputsPresent( - remoteActionExecutionContext, - uploadable, - blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD, - remotePathResolver); - } - } catch (IOException e) { - throw new WrappedException(e); - } catch (InterruptedException e) { - throw new WrappedException(e); - } + uploadable, + blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD, + remotePathResolver); } - // Move the computed root to the persistent cache so that it can be reused - // by later builds. - persistentCache - .asMap() - .compute( - metadata, - (unused, oldRoot) -> { - // Don't downgrade the cached root from one indicating that its - // blobs have been uploaded. - return oldRoot instanceof MerkleTree.RootOnly.BlobsUploaded - ? oldRoot - : merkleTree.root(); - }); - return merkleTree.root(); - }, - MERKLE_TREE_UPLOAD_POOL); + } catch (IOException e) { + throw new WrappedException(e); + } catch (InterruptedException e) { + throw new WrappedException(e); + } + } + // Move the computed root to the persistent cache so that it can be reused + // by later builds. + persistentCache + .asMap() + .compute( + metadata, + (unused, oldRoot) -> { + // Don't downgrade the cached root from one indicating that its + // blobs have been uploaded. + return oldRoot instanceof MerkleTree.RootOnly.BlobsUploaded + ? oldRoot + : merkleTree.root(); + }); + return merkleTree.root(); + }, + MERKLE_TREE_UPLOAD_POOL); }) // This part of the future must be kept outside the cache lambda to avoid recursive updates // to the in-flight cache. From 5f95d0cb0c70f944092092000208c63e3eb7a499 Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Sat, 18 Oct 2025 16:54:36 +0200 Subject: [PATCH 2/4] Add test --- .../build/lib/remote/merkletree/BUILD | 1 + .../remote/merkletree/MerkleTreeComputer.java | 6 ++++- .../remote/RemoteExecutionServiceTest.java | 27 +++++++++++++++++-- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD b/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD index c8c999e35c31da..0a646ae9c0759a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD @@ -28,6 +28,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value", + "//src/main/java/com/google/devtools/build/lib/util:TestType", "//src/main/java/com/google/devtools/build/lib/util:string_encoding", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java index d4dbeb4d246bf1..f68dfa0b5a27b4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java @@ -65,6 +65,7 @@ import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.skyframe.TreeArtifactValue; +import com.google.devtools.build.lib.util.TestType; import com.google.devtools.build.lib.vfs.Dirent; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; @@ -138,7 +139,10 @@ public final class MerkleTreeComputer { // TODO: Source directories are also visited on this pool in a single-threaded manner. private static final ExecutorService MERKLE_TREE_BUILD_POOL = Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors(), + // Run with reduced parallelism in tests to reproduce potential deadlocks more easily. + Math.min( + TestType.isInTest() ? 4 : Integer.MAX_VALUE, + Runtime.getRuntime().availableProcessors()), Thread.ofPlatform().name("merkle-tree-build-", 0).factory()); // Uploading Merkle trees mostly involves waiting on networking futures, for which virtual threads diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java index f3584298e63bd5..13f6e04da3b89b 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java @@ -146,6 +146,7 @@ import java.util.Collections; import java.util.Random; import java.util.SortedMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -747,8 +748,30 @@ public void buildRemoteAction_goldenTest(@TestParameter({"1", "2", "3"}) int see }); assertThat(digestUtil.compute(rootDirectory)).isEqualTo(expectedDigest); - assertThat(service.buildRemoteAction(spawn, context).getMerkleTree().digest()) - .isEqualTo(expectedDigest); + + // Verify that multiple concurrent Merkle tree builds all produce the same result and don't + // interfere with each other. + var exceptions = new ConcurrentLinkedDeque(); + try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { + for (int i = 0; i < 16; i++) { + executor.execute( + () -> { + try { + assertThat(service.buildRemoteAction(spawn, context).getMerkleTree().digest()) + .isEqualTo(expectedDigest); + } catch (Exception e) { + exceptions.add(e); + } + }); + } + } + if (!exceptions.isEmpty()) { + var combinedException = new AssertionError("Exceptions in golden test runs:"); + for (var e : exceptions) { + combinedException.addSuppressed(e); + } + throw combinedException; + } } private FileNode file(String name, String content) { From 331a2c0cdfd62c1ff7f34c351ff45c66e90e7677 Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Mon, 20 Oct 2025 00:05:13 +0200 Subject: [PATCH 3/4] Update RemoteExecutionServiceTest.java --- .../devtools/build/lib/remote/RemoteExecutionServiceTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java index 13f6e04da3b89b..e44cd8aa0e1593 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java @@ -751,7 +751,7 @@ public void buildRemoteAction_goldenTest(@TestParameter({"1", "2", "3"}) int see // Verify that multiple concurrent Merkle tree builds all produce the same result and don't // interfere with each other. - var exceptions = new ConcurrentLinkedDeque(); + var exceptions = new ConcurrentLinkedDeque(); try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { for (int i = 0; i < 16; i++) { executor.execute( @@ -759,7 +759,7 @@ public void buildRemoteAction_goldenTest(@TestParameter({"1", "2", "3"}) int see try { assertThat(service.buildRemoteAction(spawn, context).getMerkleTree().digest()) .isEqualTo(expectedDigest); - } catch (Exception e) { + } catch (Throwable e) { exceptions.add(e); } }); From be0ba53a0e16828d72bee24a6f89967d4e8e1d2a Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Tue, 21 Oct 2025 12:01:16 +0200 Subject: [PATCH 4/4] Rewrite to ListenableFuture --- .../remote/merkletree/MerkleTreeComputer.java | 253 +++++++++--------- 1 file changed, 130 insertions(+), 123 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java index f68dfa0b5a27b4..82bea5852b555a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java @@ -18,18 +18,21 @@ import static com.google.common.base.Predicates.alwaysFalse; import static com.google.common.base.Predicates.alwaysTrue; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.util.concurrent.Futures.allAsList; +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static com.google.common.util.concurrent.Futures.submitAsync; +import static com.google.common.util.concurrent.Futures.transform; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.devtools.build.lib.util.StringEncoding.internalToUnicode; import static com.google.devtools.build.lib.vfs.PathFragment.HIERARCHICAL_COMPARATOR; import static java.util.Comparator.comparing; import static java.util.Map.entry; -import static java.util.concurrent.CompletableFuture.allOf; -import static java.util.concurrent.CompletableFuture.completedFuture; +import build.bazel.remote.execution.v2.Action; import build.bazel.remote.execution.v2.Digest; import build.bazel.remote.execution.v2.Directory; import build.bazel.remote.execution.v2.NodeProperties; import build.bazel.remote.execution.v2.NodeProperty; -import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.base.Preconditions; @@ -40,7 +43,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputHelper; import com.google.devtools.build.lib.actions.Artifact; @@ -74,18 +77,19 @@ import java.io.IOException; import java.util.AbstractCollection; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collection; import java.util.Deque; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import javax.annotation.Nullable; @@ -167,8 +171,8 @@ public final class MerkleTreeComputer { private final String workspaceName; private final Digest emptyDigest; private final MerkleTree.Uploadable emptyTree; - private final AsyncCache inFlightSubTreeCache = - Caffeine.newBuilder().buildAsync(); + private final ConcurrentHashMap> + inFlightSubTreeCache = new ConcurrentHashMap<>(); public MerkleTreeComputer( DigestUtil digestUtil, @@ -261,7 +265,7 @@ public MerkleTree buildForSpawn( if (!Objects.equals(scrubber, lastScrubber)) { persistentToolSubTreeCache.invalidateAll(); persistentNonToolSubTreeCache.invalidateAll(); - inFlightSubTreeCache.synchronous().invalidateAll(); + inFlightSubTreeCache.clear(); lastScrubber = scrubber; } } @@ -392,7 +396,7 @@ public MerkleTree.Uploadable buildForFiles(Map inputs) BlobPolicy.KEEP_AND_REUPLOAD)); } - private CompletableFuture build( + private ListenableFuture build( Collection> sortedInputs, Predicate isToolInput, @Nullable SpawnScrubber spawnScrubber, @@ -402,32 +406,32 @@ private CompletableFuture build( @Nullable RemotePathResolver remotePathResolver, BlobPolicy blobPolicy) throws IOException { - return precomputeSubTrees( + return transform( + precomputeSubTrees( sortedInputs, isToolInput, metadataProvider, artifactPathResolver, remoteActionExecutionContext, remotePathResolver, - blobPolicy) - .thenApplyAsync( - subTreeRoots -> { - try { - return buildWithPrecomputedSubTrees( - subTreeRoots, - sortedInputs, - isToolInput, - spawnScrubber, - metadataProvider, - artifactPathResolver, - blobPolicy); - } catch (IOException e) { - throw new WrappedException(e); - } catch (InterruptedException e) { - throw new WrappedException(e); - } - }, - MERKLE_TREE_BUILD_POOL); + blobPolicy), + subTreeRoots -> { + try { + return buildWithPrecomputedSubTrees( + subTreeRoots, + sortedInputs, + isToolInput, + spawnScrubber, + metadataProvider, + artifactPathResolver, + blobPolicy); + } catch (IOException e) { + throw new WrappedException(e); + } catch (InterruptedException e) { + throw new WrappedException(e); + } + }, + MERKLE_TREE_BUILD_POOL); } private MerkleTree buildWithPrecomputedSubTrees( @@ -618,7 +622,7 @@ private MerkleTree buildWithPrecomputedSubTrees( throw new IllegalStateException("not reached"); } - private CompletableFuture< + private ListenableFuture< ImmutableMap< ? extends Map.Entry, MerkleTree.RootOnly>> precomputeSubTrees( @@ -631,9 +635,9 @@ private MerkleTree buildWithPrecomputedSubTrees( BlobPolicy blobPolicy) throws IOException { var subTreeFutures = - new HashMap< - Map.Entry, - CompletableFuture>(); + new ArrayList< + ListenableFuture< + Map.Entry, MerkleTree.RootOnly>>>(); for (var entry : sortedInputs) { var future = maybeCacheSubtree( @@ -646,17 +650,14 @@ private MerkleTree buildWithPrecomputedSubTrees( remotePathResolver, blobPolicy); if (future != null) { - subTreeFutures.put(entry, future); + subTreeFutures.add(transform(future, subTree -> entry(entry, subTree), directExecutor())); } } - return allOf(subTreeFutures.values().toArray(CompletableFuture[]::new)) - .thenApply( - unused -> - ImmutableMap.copyOf(Maps.transformValues(subTreeFutures, CompletableFuture::join))); + return transform(allAsList(subTreeFutures), ImmutableMap::copyOf, directExecutor()); } @Nullable - private CompletableFuture maybeCacheSubtree( + private ListenableFuture maybeCacheSubtree( @Nullable ActionInput input, PathFragment mappedExecPath, Predicate isToolInput, @@ -708,7 +709,7 @@ yield computeIfAbsent( }; } - private CompletableFuture computeForRunfilesTreeIfAbsent( + private ListenableFuture computeForRunfilesTreeIfAbsent( RunfilesArtifactValue runfilesArtifactValue, PathFragment mappedExecPath, Predicate isToolInput, @@ -750,7 +751,7 @@ private CompletableFuture computeForRunfilesTreeIfAbsent( blobPolicy); } - private CompletableFuture computeForTreeArtifactIfAbsent( + private ListenableFuture computeForTreeArtifactIfAbsent( TreeArtifactValue treeArtifactValue, PathFragment mappedExecPath, Predicate isToolInput, @@ -789,7 +790,7 @@ private interface SortedInputsSupplier { throws IOException, InterruptedException; } - private CompletableFuture computeIfAbsent( + private ListenableFuture computeIfAbsent( FileArtifactValue metadata, SortedInputsSupplier sortedInputsSupplier, boolean isTool, @@ -806,96 +807,102 @@ private CompletableFuture computeIfAbsent( if (cachedRoot != null && (blobPolicy == BlobPolicy.DISCARD || cachedRoot instanceof MerkleTree.RootOnly.BlobsUploaded)) { - return completedFuture(cachedRoot); + return immediateFuture(cachedRoot); } } var inFlightCacheKey = new InFlightCacheKey(metadata, isTool, blobPolicy != BlobPolicy.DISCARD); if (blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD) { - inFlightSubTreeCache.synchronous().invalidate(inFlightCacheKey); + inFlightSubTreeCache.remove(inFlightCacheKey); } - return inFlightSubTreeCache - .get( + var newlyComputed = new AtomicBoolean(); + var future = + inFlightSubTreeCache.computeIfAbsent( inFlightCacheKey, - (key, unusedExecutor) -> { - // There is a window in which a concurrent call may have removed the in-flight cache - // entry while this one had already passed the check above. Recheck the persistent - // cache to avoid unnecessary work. - var cachedRoot = persistentCache.getIfPresent(metadata); - if (cachedRoot != null - && (blobPolicy == BlobPolicy.DISCARD - || cachedRoot instanceof MerkleTree.RootOnly.BlobsUploaded)) { - return completedFuture(cachedRoot); - } - // An ongoing computation with blobs can be reused for one that doesn't require them. - if (blobPolicy == BlobPolicy.DISCARD) { - var inFlightComputation = - inFlightSubTreeCache.getIfPresent( - new InFlightCacheKey(metadata, isTool, /* uploadBlobs= */ true)); - if (inFlightComputation != null) { - return inFlightComputation; - } - } - CompletableFuture merkleTreeFuture; - try { - // Subtrees either consist entirely of tool inputs or don't contain any. - // The same applies to scrubbed inputs. - merkleTreeFuture = - build( - sortedInputsSupplier.compute(), - isTool ? alwaysTrue() : alwaysFalse(), - /* spawnScrubber= */ null, - metadataProvider, - artifactPathResolver, - remoteActionExecutionContext, - remotePathResolver, - blobPolicy); - } catch (IOException e) { - throw new WrappedException(e); - } catch (InterruptedException e) { - throw new WrappedException(e); - } - return merkleTreeFuture.thenApplyAsync( - merkleTree -> { - if (merkleTree instanceof MerkleTree.Uploadable uploadable) { - try { - if (merkleTreeUploader != null) { - merkleTreeUploader.ensureInputsPresent( - remoteActionExecutionContext, - uploadable, - blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD, - remotePathResolver); - } - } catch (IOException e) { - throw new WrappedException(e); - } catch (InterruptedException e) { - throw new WrappedException(e); + unusedKey -> { + newlyComputed.set(true); + return submitAsync( + () -> { + // There is a window in which a concurrent call may have removed the in-flight + // cache entry while this one had already passed the check above. Recheck the + // persistent cache to avoid unnecessary work. + var cachedRoot = persistentCache.getIfPresent(metadata); + if (cachedRoot != null + && (blobPolicy == BlobPolicy.DISCARD + || cachedRoot instanceof MerkleTree.RootOnly.BlobsUploaded)) { + return immediateFuture(cachedRoot); + } + // An ongoing computation with blobs can be reused for one that doesn't require + // them. + if (blobPolicy == BlobPolicy.DISCARD) { + var inFlightComputation = + inFlightSubTreeCache.get( + new InFlightCacheKey(metadata, isTool, /* uploadBlobs= */ true)); + if (inFlightComputation != null) { + return inFlightComputation; } } - // Move the computed root to the persistent cache so that it can be reused - // by later builds. - persistentCache - .asMap() - .compute( - metadata, - (unused, oldRoot) -> { - // Don't downgrade the cached root from one indicating that its - // blobs have been uploaded. - return oldRoot instanceof MerkleTree.RootOnly.BlobsUploaded - ? oldRoot - : merkleTree.root(); - }); - return merkleTree.root(); + ListenableFuture merkleTreeFuture; + try { + // Subtrees either consist entirely of tool inputs or don't contain any. + // The same applies to scrubbed inputs. + merkleTreeFuture = + build( + sortedInputsSupplier.compute(), + isTool ? alwaysTrue() : alwaysFalse(), + /* spawnScrubber= */ null, + metadataProvider, + artifactPathResolver, + remoteActionExecutionContext, + remotePathResolver, + blobPolicy); + } catch (IOException e) { + throw new WrappedException(e); + } catch (InterruptedException e) { + throw new WrappedException(e); + } + return transform( + merkleTreeFuture, + merkleTree -> { + if (merkleTree instanceof MerkleTree.Uploadable uploadable) { + try { + if (merkleTreeUploader != null) { + merkleTreeUploader.ensureInputsPresent( + remoteActionExecutionContext, + uploadable, + blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD, + remotePathResolver); + } + } catch (IOException e) { + throw new WrappedException(e); + } catch (InterruptedException e) { + throw new WrappedException(e); + } + } + // Move the computed root to the persistent cache so that it can be reused + // by later builds. + persistentCache + .asMap() + .compute( + metadata, + (unused, oldRoot) -> { + // Don't downgrade the cached root from one indicating that its + // blobs have been uploaded. + return oldRoot instanceof MerkleTree.RootOnly.BlobsUploaded + ? oldRoot + : merkleTree.root(); + }); + return merkleTree.root(); + }, + MERKLE_TREE_UPLOAD_POOL); }, - MERKLE_TREE_UPLOAD_POOL); - }) - // This part of the future must be kept outside the cache lambda to avoid recursive updates - // to the in-flight cache. - .thenApply( - root -> { - // Clean up the in-flight cache so that it doesn't grow unboundedly. - inFlightSubTreeCache.synchronous().invalidate(inFlightCacheKey); - return root; + MERKLE_TREE_BUILD_POOL); }); + if (newlyComputed.get()) { + // Clean up the in-flight cache so that it doesn't grow unboundedly. + future.addListener( + () -> inFlightSubTreeCache.remove(inFlightCacheKey, future), directExecutor()); + } + return future; } private static T getFromFuture(Future future) throws IOException, InterruptedException {