diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD b/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD index c8c999e35c31da..0a646ae9c0759a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD @@ -28,6 +28,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value", + "//src/main/java/com/google/devtools/build/lib/util:TestType", "//src/main/java/com/google/devtools/build/lib/util:string_encoding", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java index 8c2f7315e64633..82bea5852b555a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeComputer.java @@ -18,21 +18,24 @@ import static com.google.common.base.Predicates.alwaysFalse; import static com.google.common.base.Predicates.alwaysTrue; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.util.concurrent.Futures.allAsList; +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static com.google.common.util.concurrent.Futures.submitAsync; +import static com.google.common.util.concurrent.Futures.transform; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.devtools.build.lib.util.StringEncoding.internalToUnicode; import static com.google.devtools.build.lib.vfs.PathFragment.HIERARCHICAL_COMPARATOR; import static java.util.Comparator.comparing; import static java.util.Map.entry; -import static java.util.concurrent.CompletableFuture.allOf; -import static java.util.concurrent.CompletableFuture.completedFuture; -import static java.util.concurrent.CompletableFuture.supplyAsync; +import build.bazel.remote.execution.v2.Action; import build.bazel.remote.execution.v2.Digest; import build.bazel.remote.execution.v2.Directory; import build.bazel.remote.execution.v2.NodeProperties; import build.bazel.remote.execution.v2.NodeProperty; -import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -40,6 +43,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputHelper; import com.google.devtools.build.lib.actions.Artifact; @@ -64,6 +68,7 @@ import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.skyframe.TreeArtifactValue; +import com.google.devtools.build.lib.util.TestType; import com.google.devtools.build.lib.vfs.Dirent; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; @@ -79,11 +84,12 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; import javax.annotation.Nullable; @@ -137,7 +143,10 @@ public final class MerkleTreeComputer { // TODO: Source directories are also visited on this pool in a single-threaded manner. private static final ExecutorService MERKLE_TREE_BUILD_POOL = Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors(), + // Run with reduced parallelism in tests to reproduce potential deadlocks more easily. + Math.min( + TestType.isInTest() ? 4 : Integer.MAX_VALUE, + Runtime.getRuntime().availableProcessors()), Thread.ofPlatform().name("merkle-tree-build-", 0).factory()); // Uploading Merkle trees mostly involves waiting on networking futures, for which virtual threads @@ -162,8 +171,8 @@ public final class MerkleTreeComputer { private final String workspaceName; private final Digest emptyDigest; private final MerkleTree.Uploadable emptyTree; - private final AsyncCache inFlightSubTreeCache = - Caffeine.newBuilder().buildAsync(); + private final ConcurrentHashMap> + inFlightSubTreeCache = new ConcurrentHashMap<>(); public MerkleTreeComputer( DigestUtil digestUtil, @@ -256,7 +265,7 @@ public MerkleTree buildForSpawn( if (!Objects.equals(scrubber, lastScrubber)) { persistentToolSubTreeCache.invalidateAll(); persistentNonToolSubTreeCache.invalidateAll(); - inFlightSubTreeCache.synchronous().invalidateAll(); + inFlightSubTreeCache.clear(); lastScrubber = scrubber; } } @@ -297,18 +306,20 @@ public MerkleTree buildForSpawn( path -> toolInputs.contains(path.relativeTo(remotePathResolver.getWorkingDirectory())); } try { - return build( - Lists.transform( - allInputs, - input -> - entry(getOutputPath(input, remotePathResolver, spawn.getPathMapper()), input)), - isToolInput, - scrubber != null ? scrubber.forSpawn(spawn) : null, - spawnExecutionContext.getInputMetadataProvider(), - spawnExecutionContext.getPathResolver(), - remoteActionExecutionContext, - remotePathResolver, - blobPolicy); + return getFromFuture( + build( + Lists.transform( + allInputs, + input -> + entry( + getOutputPath(input, remotePathResolver, spawn.getPathMapper()), input)), + isToolInput, + scrubber != null ? scrubber.forSpawn(spawn) : null, + spawnExecutionContext.getInputMetadataProvider(), + spawnExecutionContext.getPathResolver(), + remoteActionExecutionContext, + remotePathResolver, + blobPolicy)); } catch (BulkTransferException e) { e.getLostArtifacts(spawnExecutionContext.getInputMetadataProvider()::getInput) .throwIfNotEmpty(); @@ -370,21 +381,22 @@ public MerkleTree.Uploadable buildForFiles(Map inputs) throws IOException, InterruptedException { // BlobPolicy.KEEP_AND_REUPLOAD always results in a MerkleTree.Uploadable. return (MerkleTree.Uploadable) - build( - Lists.transform( - ImmutableList.sortedCopyOf( - Map.Entry.comparingByKey(HIERARCHICAL_COMPARATOR), inputs.entrySet()), - e -> entry(e.getKey(), new ActionInputWithPath(e.getValue()))), - alwaysFalse(), - /* spawnScrubber= */ null, - StaticInputMetadataProvider.empty(), - actionInputWithPathResolver, - /* remoteActionExecutionContext= */ null, - /* remotePathResolver= */ null, - BlobPolicy.KEEP_AND_REUPLOAD); + getFromFuture( + build( + Lists.transform( + ImmutableList.sortedCopyOf( + Map.Entry.comparingByKey(HIERARCHICAL_COMPARATOR), inputs.entrySet()), + e -> entry(e.getKey(), new ActionInputWithPath(e.getValue()))), + alwaysFalse(), + /* spawnScrubber= */ null, + StaticInputMetadataProvider.empty(), + actionInputWithPathResolver, + /* remoteActionExecutionContext= */ null, + /* remotePathResolver= */ null, + BlobPolicy.KEEP_AND_REUPLOAD)); } - private MerkleTree build( + private ListenableFuture build( Collection> sortedInputs, Predicate isToolInput, @Nullable SpawnScrubber spawnScrubber, @@ -393,21 +405,48 @@ private MerkleTree build( @Nullable RemoteActionExecutionContext remoteActionExecutionContext, @Nullable RemotePathResolver remotePathResolver, BlobPolicy blobPolicy) - throws IOException, InterruptedException { - if (sortedInputs.isEmpty()) { - return emptyTree; - } - - var unused = - getFromFuture( - cacheSubTrees( + throws IOException { + return transform( + precomputeSubTrees( + sortedInputs, + isToolInput, + metadataProvider, + artifactPathResolver, + remoteActionExecutionContext, + remotePathResolver, + blobPolicy), + subTreeRoots -> { + try { + return buildWithPrecomputedSubTrees( + subTreeRoots, sortedInputs, isToolInput, + spawnScrubber, metadataProvider, artifactPathResolver, - remoteActionExecutionContext, - remotePathResolver, - blobPolicy)); + blobPolicy); + } catch (IOException e) { + throw new WrappedException(e); + } catch (InterruptedException e) { + throw new WrappedException(e); + } + }, + MERKLE_TREE_BUILD_POOL); + } + + private MerkleTree buildWithPrecomputedSubTrees( + ImmutableMap, MerkleTree.RootOnly> + subTreeRoots, + Collection> sortedInputs, + Predicate isToolInput, + @Nullable SpawnScrubber spawnScrubber, + InputMetadataProvider metadataProvider, + ArtifactPathResolver artifactPathResolver, + BlobPolicy blobPolicy) + throws IOException, InterruptedException { + if (sortedInputs.isEmpty()) { + return emptyTree; + } long inputFiles = 0; long inputBytes = 0; @@ -484,39 +523,15 @@ private MerkleTree build( var nodeProperties = isToolInput.test(path) ? TOOL_NODE_PROPERTIES : null; switch (input) { - case Artifact treeArtifact when treeArtifact.isTreeArtifact() -> { + case Artifact.SpecialArtifact specialArtifact + when specialArtifact.isTreeArtifact() || specialArtifact.isRunfilesTree() -> { var subTreeRoot = - getFromFuture( - computeForTreeArtifactIfAbsent( - metadataProvider.getTreeMetadata(treeArtifact), - path, - isToolInput, - metadataProvider, - artifactPathResolver, - remoteActionExecutionContext, - remotePathResolver, - blobPolicy)); + Preconditions.checkNotNull(subTreeRoots.get(entry), "missing subtree for %s", input); currentDirectory.addDirectoriesBuilder().setName(name).setDigest(subTreeRoot.digest()); inputFiles += subTreeRoot.inputFiles(); inputBytes += subTreeRoot.inputBytes(); } - case Artifact runfilesArtifact when runfilesArtifact.isRunfilesTree() -> { - var subTreeRoot = - getFromFuture( - computeForRunfilesTreeIfAbsent( - metadataProvider.getRunfilesMetadata(runfilesArtifact), - path, - isToolInput, - metadataProvider, - artifactPathResolver, - remoteActionExecutionContext, - remotePathResolver, - blobPolicy)); - currentDirectory.addDirectoriesBuilder().setName(name).setDigest(subTreeRoot.digest()); - inputFiles += subTreeRoot.inputFiles(); - inputBytes += subTreeRoot.inputBytes(); - } - case Artifact symlink when symlink.isSymlink() -> { + case Artifact.SpecialArtifact symlink when symlink.isSymlink() -> { Path symlinkPath = artifactPathResolver.toPath(symlink); var builder = currentDirectory @@ -536,18 +551,8 @@ private MerkleTree build( fileOrSourceDirectory); if (metadata.getType() == FileStateType.DIRECTORY) { var subTreeRoot = - getFromFuture( - computeIfAbsent( - metadata, - () -> - explodeDirectory(artifactPathResolver.toPath(fileOrSourceDirectory)) - .entrySet(), - isToolInput.test(path), - metadataProvider, - artifactPathResolver, - remoteActionExecutionContext, - remotePathResolver, - blobPolicy)); + Preconditions.checkNotNull( + subTreeRoots.get(entry), "missing subtree for %s", input); currentDirectory.addDirectoriesBuilder().setName(name).setDigest(subTreeRoot.digest()); inputFiles += subTreeRoot.inputFiles(); inputBytes += subTreeRoot.inputBytes(); @@ -617,16 +622,22 @@ private MerkleTree build( throw new IllegalStateException("not reached"); } - private CompletableFuture cacheSubTrees( - Collection> sortedInputs, - Predicate isToolInput, - InputMetadataProvider metadataProvider, - ArtifactPathResolver artifactPathResolver, - RemoteActionExecutionContext remoteActionExecutionContext, - RemotePathResolver remotePathResolver, - BlobPolicy blobPolicy) - throws IOException { - ArrayList> subTreeFutures = new ArrayList<>(); + private ListenableFuture< + ImmutableMap< + ? extends Map.Entry, MerkleTree.RootOnly>> + precomputeSubTrees( + Collection> sortedInputs, + Predicate isToolInput, + InputMetadataProvider metadataProvider, + ArtifactPathResolver artifactPathResolver, + RemoteActionExecutionContext remoteActionExecutionContext, + RemotePathResolver remotePathResolver, + BlobPolicy blobPolicy) + throws IOException { + var subTreeFutures = + new ArrayList< + ListenableFuture< + Map.Entry, MerkleTree.RootOnly>>>(); for (var entry : sortedInputs) { var future = maybeCacheSubtree( @@ -639,14 +650,14 @@ private CompletableFuture cacheSubTrees( remotePathResolver, blobPolicy); if (future != null) { - subTreeFutures.add(future); + subTreeFutures.add(transform(future, subTree -> entry(entry, subTree), directExecutor())); } } - return allOf(subTreeFutures.toArray(CompletableFuture[]::new)); + return transform(allAsList(subTreeFutures), ImmutableMap::copyOf, directExecutor()); } @Nullable - private CompletableFuture maybeCacheSubtree( + private ListenableFuture maybeCacheSubtree( @Nullable ActionInput input, PathFragment mappedExecPath, Predicate isToolInput, @@ -698,7 +709,7 @@ yield computeIfAbsent( }; } - private CompletableFuture computeForRunfilesTreeIfAbsent( + private ListenableFuture computeForRunfilesTreeIfAbsent( RunfilesArtifactValue runfilesArtifactValue, PathFragment mappedExecPath, Predicate isToolInput, @@ -740,7 +751,7 @@ private CompletableFuture computeForRunfilesTreeIfAbsent( blobPolicy); } - private CompletableFuture computeForTreeArtifactIfAbsent( + private ListenableFuture computeForTreeArtifactIfAbsent( TreeArtifactValue treeArtifactValue, PathFragment mappedExecPath, Predicate isToolInput, @@ -779,7 +790,7 @@ private interface SortedInputsSupplier { throws IOException, InterruptedException; } - private CompletableFuture computeIfAbsent( + private ListenableFuture computeIfAbsent( FileArtifactValue metadata, SortedInputsSupplier sortedInputsSupplier, boolean isTool, @@ -796,41 +807,46 @@ private CompletableFuture computeIfAbsent( if (cachedRoot != null && (blobPolicy == BlobPolicy.DISCARD || cachedRoot instanceof MerkleTree.RootOnly.BlobsUploaded)) { - return completedFuture(cachedRoot); + return immediateFuture(cachedRoot); } } var inFlightCacheKey = new InFlightCacheKey(metadata, isTool, blobPolicy != BlobPolicy.DISCARD); if (blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD) { - inFlightSubTreeCache.synchronous().invalidate(inFlightCacheKey); + inFlightSubTreeCache.remove(inFlightCacheKey); } - return inFlightSubTreeCache - .get( + var newlyComputed = new AtomicBoolean(); + var future = + inFlightSubTreeCache.computeIfAbsent( inFlightCacheKey, - (key, unusedExecutor) -> { - // There is a window in which a concurrent call may have removed the in-flight cache - // entry while this one had already passed the check above. Recheck the persistent - // cache to avoid unnecessary work. - var cachedRoot = persistentCache.getIfPresent(metadata); - if (cachedRoot != null - && (blobPolicy == BlobPolicy.DISCARD - || cachedRoot instanceof MerkleTree.RootOnly.BlobsUploaded)) { - return completedFuture(cachedRoot); - } - // An ongoing computation with blobs can be reused for one that doesn't require them. - if (blobPolicy == BlobPolicy.DISCARD) { - var inFlightComputation = - inFlightSubTreeCache.getIfPresent( - new InFlightCacheKey(metadata, isTool, /* uploadBlobs= */ true)); - if (inFlightComputation != null) { - return inFlightComputation; - } - } - return supplyAsync( - () -> { - try { - // Subtrees either consist entirely of tool inputs or don't contain any. - // The same applies to scrubbed inputs. - return build( + unusedKey -> { + newlyComputed.set(true); + return submitAsync( + () -> { + // There is a window in which a concurrent call may have removed the in-flight + // cache entry while this one had already passed the check above. Recheck the + // persistent cache to avoid unnecessary work. + var cachedRoot = persistentCache.getIfPresent(metadata); + if (cachedRoot != null + && (blobPolicy == BlobPolicy.DISCARD + || cachedRoot instanceof MerkleTree.RootOnly.BlobsUploaded)) { + return immediateFuture(cachedRoot); + } + // An ongoing computation with blobs can be reused for one that doesn't require + // them. + if (blobPolicy == BlobPolicy.DISCARD) { + var inFlightComputation = + inFlightSubTreeCache.get( + new InFlightCacheKey(metadata, isTool, /* uploadBlobs= */ true)); + if (inFlightComputation != null) { + return inFlightComputation; + } + } + ListenableFuture merkleTreeFuture; + try { + // Subtrees either consist entirely of tool inputs or don't contain any. + // The same applies to scrubbed inputs. + merkleTreeFuture = + build( sortedInputsSupplier.compute(), isTool ? alwaysTrue() : alwaysFalse(), /* spawnScrubber= */ null, @@ -839,55 +855,54 @@ private CompletableFuture computeIfAbsent( remoteActionExecutionContext, remotePathResolver, blobPolicy); - } catch (IOException e) { - throw new WrappedException(e); - } catch (InterruptedException e) { - throw new WrappedException(e); - } - }, - MERKLE_TREE_BUILD_POOL) - .thenApplyAsync( - merkleTree -> { - if (merkleTree instanceof MerkleTree.Uploadable uploadable) { - try { - if (merkleTreeUploader != null) { - merkleTreeUploader.ensureInputsPresent( - remoteActionExecutionContext, - uploadable, - blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD, - remotePathResolver); + } catch (IOException e) { + throw new WrappedException(e); + } catch (InterruptedException e) { + throw new WrappedException(e); + } + return transform( + merkleTreeFuture, + merkleTree -> { + if (merkleTree instanceof MerkleTree.Uploadable uploadable) { + try { + if (merkleTreeUploader != null) { + merkleTreeUploader.ensureInputsPresent( + remoteActionExecutionContext, + uploadable, + blobPolicy == BlobPolicy.KEEP_AND_REUPLOAD, + remotePathResolver); + } + } catch (IOException e) { + throw new WrappedException(e); + } catch (InterruptedException e) { + throw new WrappedException(e); } - } catch (IOException e) { - throw new WrappedException(e); - } catch (InterruptedException e) { - throw new WrappedException(e); } - } - // Move the computed root to the persistent cache so that it can be reused - // by later builds. - persistentCache - .asMap() - .compute( - metadata, - (unused, oldRoot) -> { - // Don't downgrade the cached root from one indicating that its - // blobs have been uploaded. - return oldRoot instanceof MerkleTree.RootOnly.BlobsUploaded - ? oldRoot - : merkleTree.root(); - }); - return merkleTree.root(); - }, - MERKLE_TREE_UPLOAD_POOL); - }) - // This part of the future must be kept outside the cache lambda to avoid recursive updates - // to the in-flight cache. - .thenApply( - root -> { - // Clean up the in-flight cache so that it doesn't grow unboundedly. - inFlightSubTreeCache.synchronous().invalidate(inFlightCacheKey); - return root; + // Move the computed root to the persistent cache so that it can be reused + // by later builds. + persistentCache + .asMap() + .compute( + metadata, + (unused, oldRoot) -> { + // Don't downgrade the cached root from one indicating that its + // blobs have been uploaded. + return oldRoot instanceof MerkleTree.RootOnly.BlobsUploaded + ? oldRoot + : merkleTree.root(); + }); + return merkleTree.root(); + }, + MERKLE_TREE_UPLOAD_POOL); + }, + MERKLE_TREE_BUILD_POOL); }); + if (newlyComputed.get()) { + // Clean up the in-flight cache so that it doesn't grow unboundedly. + future.addListener( + () -> inFlightSubTreeCache.remove(inFlightCacheKey, future), directExecutor()); + } + return future; } private static T getFromFuture(Future future) throws IOException, InterruptedException { diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java index f3584298e63bd5..e44cd8aa0e1593 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java @@ -146,6 +146,7 @@ import java.util.Collections; import java.util.Random; import java.util.SortedMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -747,8 +748,30 @@ public void buildRemoteAction_goldenTest(@TestParameter({"1", "2", "3"}) int see }); assertThat(digestUtil.compute(rootDirectory)).isEqualTo(expectedDigest); - assertThat(service.buildRemoteAction(spawn, context).getMerkleTree().digest()) - .isEqualTo(expectedDigest); + + // Verify that multiple concurrent Merkle tree builds all produce the same result and don't + // interfere with each other. + var exceptions = new ConcurrentLinkedDeque(); + try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { + for (int i = 0; i < 16; i++) { + executor.execute( + () -> { + try { + assertThat(service.buildRemoteAction(spawn, context).getMerkleTree().digest()) + .isEqualTo(expectedDigest); + } catch (Throwable e) { + exceptions.add(e); + } + }); + } + } + if (!exceptions.isEmpty()) { + var combinedException = new AssertionError("Exceptions in golden test runs:"); + for (var e : exceptions) { + combinedException.addSuppressed(e); + } + throw combinedException; + } } private FileNode file(String name, String content) {