Skip to content

Commit

Permalink
Widen interface for action input prefetching.
Browse files Browse the repository at this point in the history
RELNOTES: None
PiperOrigin-RevId: 703221309
Change-Id: Ie7a4607b530a01e49f671bc6804a307740c81175
  • Loading branch information
ericfelly authored and copybara-github committed Dec 5, 2024
1 parent 5fc2b01 commit b024eea
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,10 @@

/** Prefetches files to local disk. */
public interface ActionInputPrefetcher {
/**
* Returns the metadata for an {@link ActionInput}.
*
* <p>This will generally call through to a {@link InputMetadataProvider} and ask for the metadata
* of either an input or an output artifact.
*/
public interface MetadataSupplier {
FileArtifactValue getMetadata(ActionInput actionInput) throws IOException, InterruptedException;
}

public static final ActionInputPrefetcher NONE =
(action, inputs, metadataSupplier, priority) -> {
// Do nothing.
return immediateVoidFuture();
};
(action, inputs, metadataProvider, priority) ->
// Do nothing.
immediateVoidFuture();

/** Priority for the staging task. */
public enum Priority {
Expand Down Expand Up @@ -70,6 +59,6 @@ public enum Priority {
ListenableFuture<Void> prefetchFiles(
ActionExecutionMetadata action,
Iterable<? extends ActionInput> inputs,
MetadataSupplier metadataSupplier,
InputMetadataProvider metadataProvider,
Priority priority);
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public ListenableFuture<Void> prefetchInputs() throws ForbiddenActionInputExcept
spawn.getResourceOwner(),
getInputMapping(PathFragment.EMPTY_FRAGMENT, /* willAccessRepeatedly= */ true)
.values(),
getInputMetadataProvider()::getInputMetadata,
getInputMetadataProvider(),
Priority.MEDIUM),
BulkTransferException.class,
(BulkTransferException e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.devtools.build.lib.actions.Artifact.SpecialArtifact;
import com.google.devtools.build.lib.actions.Artifact.TreeFileArtifact;
import com.google.devtools.build.lib.actions.FileArtifactValue;
import com.google.devtools.build.lib.actions.InputMetadataProvider;
import com.google.devtools.build.lib.actions.cache.OutputMetadataStore;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.events.Reporter;
Expand Down Expand Up @@ -89,6 +90,17 @@ enum DirectoryState {
OUTPUT_PERMISSIONS
}

/**
* Returns the metadata for an {@link ActionInput}.
*
* <p>This will generally call through to a {@link InputMetadataProvider} and ask for the metadata
* of either an input or an output artifact.
*/
@VisibleForTesting
public interface MetadataSupplier {
FileArtifactValue getMetadata(ActionInput actionInput) throws IOException, InterruptedException;
}

/**
* Tracks directory permissions to minimize filesystem operations.
*
Expand Down Expand Up @@ -260,6 +272,30 @@ protected void prefetchVirtualActionInput(VirtualActionInput input) throws IOExc
*/
@Override
public ListenableFuture<Void> prefetchFiles(
ActionExecutionMetadata action,
Iterable<? extends ActionInput> inputs,
InputMetadataProvider metadataProvider,
Priority priority) {
return prefetchFilesInterruptibly(action, inputs, metadataProvider::getInputMetadata, priority);
}

/**
* Fetches remotely stored action outputs, that are inputs to this spawn, and stores them under
* their path in the output base.
*
* <p>The {@code inputs} may not contain any unexpanded directories.
*
* <p>This method is safe to be called concurrently from spawn runners before running any local
* spawn.
*
* <p>This method is similar to #prefetchFiles() above, but note that {@code metadataSupplier} may
* throw {@link InterruptedException}. If it does, this method will propagate this exception in
* the returned future.
*
* @return a future that is completed once all downloads have finished.
*/
@VisibleForTesting
public ListenableFuture<Void> prefetchFilesInterruptibly(
ActionExecutionMetadata action,
Iterable<? extends ActionInput> inputs,
MetadataSupplier metadataSupplier,
Expand Down Expand Up @@ -686,7 +722,7 @@ public void finalizeAction(Action action, OutputMetadataStore outputMetadataStor
if (!outputsToDownload.isEmpty()) {
try (var s = Profiler.instance().profile(ProfilerTask.REMOTE_DOWNLOAD, "Download outputs")) {
getFromFuture(
prefetchFiles(
prefetchFilesInterruptibly(
action, outputsToDownload, outputMetadataStore::getOutputMetadata, Priority.HIGH));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,7 @@ ActionInput getInput(String execPath) {
@Nullable
@VisibleForTesting
FileArtifactValue getInputMetadata(ActionInput input) {
PathFragment execPath = input.getExecPath();
return inputArtifactData.getMetadata(execPath);
return inputArtifactData.getInputMetadata(input);
}

private void downloadFileIfRemote(PathFragment path) throws IOException {
Expand All @@ -753,7 +752,7 @@ private void downloadFileIfRemote(PathFragment path) throws IOException {
}
getFromFuture(
inputFetcher.prefetchFiles(
action, ImmutableList.of(input), this::getInputMetadata, Priority.CRITICAL));
action, ImmutableList.of(input), inputArtifactData, Priority.CRITICAL));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(String.format("Received interrupt while fetching file '%s'", path), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,7 @@ private void downloadArtifact(
var action =
ActionUtils.getActionForLookupData(env, derivedArtifact.getGeneratingActionKey());
var future =
actionInputPrefetcher.prefetchFiles(
action, filesToDownload, inputMap::getInputMetadata, Priority.LOW);
actionInputPrefetcher.prefetchFiles(action, filesToDownload, inputMap, Priority.LOW);
futures.add(future);
}
} else {
Expand All @@ -485,7 +484,7 @@ private void downloadArtifact(
ActionUtils.getActionForLookupData(env, derivedArtifact.getGeneratingActionKey());
var future =
actionInputPrefetcher.prefetchFiles(
action, ImmutableList.of(artifact), inputMap::getInputMetadata, Priority.LOW);
action, ImmutableList.of(artifact), inputMap, Priority.LOW);
futures.add(future);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputPrefetcher.MetadataSupplier;
import com.google.devtools.build.lib.actions.ActionInputPrefetcher.Priority;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.Artifact.SpecialArtifact;
Expand All @@ -56,6 +55,7 @@
import com.google.devtools.build.lib.actions.FileArtifactValue;
import com.google.devtools.build.lib.actions.FileArtifactValue.RemoteFileArtifactValue;
import com.google.devtools.build.lib.actions.util.ActionsTestUtil;
import com.google.devtools.build.lib.remote.AbstractActionInputPrefetcher.MetadataSupplier;
import com.google.devtools.build.lib.remote.common.LostInputsEvent;
import com.google.devtools.build.lib.skyframe.TreeArtifactValue;
import com.google.devtools.build.lib.testing.vfs.SpiedFileSystem;
Expand Down Expand Up @@ -296,7 +296,9 @@ public void prefetchFiles_fileExists_doNotDownload()
FileSystemUtils.writeContent(a.getPath(), "hello world".getBytes(UTF_8));
AbstractActionInputPrefetcher prefetcher = spy(createPrefetcher(cas));

wait(prefetcher.prefetchFiles(action, metadata.keySet(), metadata::get, Priority.MEDIUM));
wait(
prefetcher.prefetchFilesInterruptibly(
action, metadata.keySet(), metadata::get, Priority.MEDIUM));

verify(prefetcher, never()).doDownloadFile(eq(action), any(), any(), any(), any(), any());
assertThat(prefetcher.downloadedFiles()).containsExactly(a.getPath());
Expand All @@ -312,7 +314,9 @@ public void prefetchFiles_fileExistsButContentMismatches_download()
FileSystemUtils.writeContent(a.getPath(), "hello world local".getBytes(UTF_8));
AbstractActionInputPrefetcher prefetcher = spy(createPrefetcher(cas));

wait(prefetcher.prefetchFiles(action, metadata.keySet(), metadata::get, Priority.MEDIUM));
wait(
prefetcher.prefetchFilesInterruptibly(
action, metadata.keySet(), metadata::get, Priority.MEDIUM));

verify(prefetcher).doDownloadFile(eq(action), any(), any(), eq(a.getExecPath()), any(), any());
assertThat(prefetcher.downloadedFiles()).containsExactly(a.getPath());
Expand All @@ -328,7 +332,9 @@ public void prefetchFiles_downloadRemoteFiles() throws Exception {
Artifact a2 = createRemoteArtifact("file2", "fizz buzz", metadata, cas);
AbstractActionInputPrefetcher prefetcher = createPrefetcher(cas);

wait(prefetcher.prefetchFiles(action, metadata.keySet(), metadata::get, Priority.MEDIUM));
wait(
prefetcher.prefetchFilesInterruptibly(
action, metadata.keySet(), metadata::get, Priority.MEDIUM));

assertThat(FileSystemUtils.readContent(a1.getPath(), UTF_8)).isEqualTo("hello world");
assertReadableNonWritableAndExecutable(a1.getPath());
Expand All @@ -346,7 +352,9 @@ public void prefetchFiles_downloadRemoteFiles_withMaterializationExecPath() thro
Artifact a = createRemoteArtifact("file", "hello world", targetExecPath, metadata, cas);
AbstractActionInputPrefetcher prefetcher = createPrefetcher(cas);

wait(prefetcher.prefetchFiles(action, metadata.keySet(), metadata::get, Priority.MEDIUM));
wait(
prefetcher.prefetchFilesInterruptibly(
action, metadata.keySet(), metadata::get, Priority.MEDIUM));

assertThat(a.getPath().isSymbolicLink()).isTrue();
assertThat(a.getPath().readSymbolicLink())
Expand Down Expand Up @@ -376,7 +384,7 @@ public void prefetchFiles_downloadRemoteTrees() throws Exception {

AbstractActionInputPrefetcher prefetcher = createPrefetcher(cas);

wait(prefetcher.prefetchFiles(action, children, metadata::get, Priority.MEDIUM));
wait(prefetcher.prefetchFilesInterruptibly(action, children, metadata::get, Priority.MEDIUM));

assertThat(FileSystemUtils.readContent(firstChild.getPath(), UTF_8)).isEqualTo("content1");
assertThat(FileSystemUtils.readContent(secondChild.getPath(), UTF_8)).isEqualTo("content2");
Expand Down Expand Up @@ -407,7 +415,7 @@ public void prefetchFiles_downloadRemoteTrees_partial() throws Exception {
AbstractActionInputPrefetcher prefetcher = createPrefetcher(cas);

wait(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(firstChild, secondChild), metadata::get, Priority.MEDIUM));

assertThat(firstChild.getPath().exists()).isFalse();
Expand Down Expand Up @@ -437,7 +445,7 @@ public void prefetchFiles_downloadRemoteTrees_withMaterializationExecPath() thro

AbstractActionInputPrefetcher prefetcher = createPrefetcher(cas);

wait(prefetcher.prefetchFiles(action, children, metadata::get, Priority.MEDIUM));
wait(prefetcher.prefetchFilesInterruptibly(action, children, metadata::get, Priority.MEDIUM));

assertThat(tree.getPath().isSymbolicLink()).isTrue();
assertThat(tree.getPath().readSymbolicLink())
Expand Down Expand Up @@ -475,13 +483,13 @@ public void prefetchFiles_downloadRemoteTrees_forActionTemplateExpansion() throw
AbstractActionInputPrefetcher prefetcher = createPrefetcher(cas);

wait(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(firstChild), metadata::get, Priority.MEDIUM));

assertTreeReadableWritableAndExecutable(tree.getPath());

wait(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(secondChild), metadata::get, Priority.MEDIUM));

assertTreeReadableWritableAndExecutable(tree.getPath());
Expand All @@ -497,7 +505,7 @@ public void prefetchFiles_missingFiles_fails() throws Exception {
Exception.class,
() ->
wait(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(a), metadata::get, Priority.MEDIUM)));

assertThat(prefetcher.downloadedFiles()).isEmpty();
Expand All @@ -515,7 +523,9 @@ public void prefetchFiles_ignoreNonRemoteFiles() throws Exception {
ImmutableMap<ActionInput, FileArtifactValue> metadata = ImmutableMap.of(a, f);
AbstractActionInputPrefetcher prefetcher = createPrefetcher(new HashMap<>());

wait(prefetcher.prefetchFiles(action, ImmutableList.of(a), metadata::get, Priority.MEDIUM));
wait(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(a), metadata::get, Priority.MEDIUM));

assertThat(prefetcher.downloadedFiles()).isEmpty();
assertThat(prefetcher.downloadsInProgress()).isEmpty();
Expand All @@ -541,7 +551,7 @@ public void prefetchFiles_ignoreNonRemoteFiles_tree() throws Exception {

AbstractActionInputPrefetcher prefetcher = createPrefetcher(cas);

wait(prefetcher.prefetchFiles(action, children, metadata::get, Priority.MEDIUM));
wait(prefetcher.prefetchFilesInterruptibly(action, children, metadata::get, Priority.MEDIUM));

assertThat(firstChild.getPath().exists()).isFalse();
assertThat(FileSystemUtils.readContent(secondChild.getPath(), UTF_8)).isEqualTo("content2");
Expand Down Expand Up @@ -573,7 +583,7 @@ public void prefetchFiles_treeFiles_minimizeFilesystemOperations() throws Except
reset(fs);

wait(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(firstChild, secondChild), metadata::get, Priority.MEDIUM));

verify(fs).createWritableDirectory(root);
Expand All @@ -584,7 +594,7 @@ public void prefetchFiles_treeFiles_minimizeFilesystemOperations() throws Except
reset(fs);

wait(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(firstChild, secondChild), metadata::get, Priority.MEDIUM));

verify(fs, never()).createWritableDirectory(root);
Expand Down Expand Up @@ -623,7 +633,7 @@ public void prefetchFiles_treeFiles_multipleThreads_waitForPermissionsToBeSet()
Callable<Void> prefetch =
() -> {
wait(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(child), metadata::get, Priority.MEDIUM));
assertTreeReadableNonWritableAndExecutable(tree.getPath());
return null;
Expand Down Expand Up @@ -657,7 +667,7 @@ public void prefetchFiles_multipleThreads_downloadIsCancelled() throws Exception
() -> {
try {
wait(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(artifact), metadata::get, Priority.MEDIUM));
} catch (IOException | ExecException | InterruptedException ignored) {
// do nothing
Expand All @@ -669,7 +679,7 @@ public void prefetchFiles_multipleThreads_downloadIsCancelled() throws Exception
() -> {
try {
wait(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(artifact), metadata::get, Priority.MEDIUM));
} catch (IOException | ExecException | InterruptedException ignored) {
// do nothing
Expand Down Expand Up @@ -707,7 +717,7 @@ public void prefetchFiles_multipleThreads_downloadIsNotCancelledByOtherThreads()
() -> {
try {
wait(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(artifact), metadata::get, Priority.MEDIUM));
} catch (IOException | ExecException | InterruptedException ignored) {
// do nothing
Expand All @@ -720,7 +730,7 @@ public void prefetchFiles_multipleThreads_downloadIsNotCancelledByOtherThreads()
() -> {
try {
wait(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(artifact), metadata::get, Priority.MEDIUM));
successful.set(true);
} catch (IOException | ExecException | InterruptedException ignored) {
Expand Down Expand Up @@ -764,7 +774,7 @@ public void prefetchFile_interruptingMetadataSupplier_interruptsDownload() throw
};

ListenableFuture<Void> future =
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(a1), interruptedMetadataSupplier, Priority.MEDIUM);

assertThrows(InterruptedException.class, () -> getFromFuture(future));
Expand Down Expand Up @@ -792,7 +802,7 @@ public void prefetchFiles_onInterrupt_deletePartialDownloadedFile() throws Excep
() -> {
try {
getFromFuture(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(a1), metadata::get, Priority.MEDIUM));
} catch (IOException ignored) {
// Intentionally left empty
Expand Down Expand Up @@ -831,7 +841,7 @@ public void onLostInputsEvent(LostInputsEvent event) {
Exception.class,
() ->
wait(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, metadata.keySet(), metadata::get, Priority.MEDIUM)));

assertThat(lostInputsEvents).hasSize(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testStagingVirtualActionInput() throws Exception {

// act
wait(
actionInputFetcher.prefetchFiles(
actionInputFetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(a), (ActionInput unused) -> null, Priority.MEDIUM));

// assert
Expand Down Expand Up @@ -131,7 +131,7 @@ public void testStagingEmptyVirtualActionInput() throws Exception {

// act
wait(
actionInputFetcher.prefetchFiles(
actionInputFetcher.prefetchFilesInterruptibly(
action,
ImmutableList.of(VirtualActionInput.EMPTY_MARKER),
(ActionInput unused) -> null,
Expand All @@ -153,7 +153,7 @@ public void prefetchFiles_missingFiles_failsWithSpecificMessage() throws Excepti
BulkTransferException.class,
() ->
wait(
prefetcher.prefetchFiles(
prefetcher.prefetchFilesInterruptibly(
action, ImmutableList.of(a), metadata::get, Priority.MEDIUM)));

assertThat(prefetcher.downloadedFiles()).isEmpty();
Expand Down

0 comments on commit b024eea

Please sign in to comment.