-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(pip): avoid response buffering #873
base: main
Are you sure you want to change the base?
Changes from all commits
7f7521b
eb61278
e5cd04e
9fe6303
469cadb
c67ec8d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,8 @@ public sealed class PyPiClient : IPyPiClient, IDisposable | |
|
||
private static readonly ProductInfoHeaderValue CommentValue = new("(+https://github.com/microsoft/component-detection)"); | ||
|
||
private static readonly TimeSpan CacheSlidingExpiration = TimeSpan.FromSeconds(CACHEINTERVALSECONDS); | ||
|
||
// Keep telemetry on how the cache is being used for future refinements | ||
private readonly PypiCacheTelemetryRecord cacheTelemetry; | ||
|
||
|
@@ -63,7 +65,13 @@ public sealed class PyPiClient : IPyPiClient, IDisposable | |
/// A thread safe cache implementation which contains a mapping of URI -> HttpResponseMessage | ||
/// and has a limited number of entries which will expire after the cache fills or a specified interval. | ||
/// </summary> | ||
private MemoryCache cachedResponses = new MemoryCache(new MemoryCacheOptions { SizeLimit = DEFAULTCACHEENTRIES }); | ||
private MemoryCache cachedResponses = new(new MemoryCacheOptions { SizeLimit = DEFAULTCACHEENTRIES }); | ||
|
||
/// <summary> | ||
/// A thread safe cache mapping <see cref="Uri"/> -> <see cref="List<PipDependencySpecification>"/> | ||
/// and has a limited number of entries which will expire after the cache fills or a specified interval. | ||
/// </summary> | ||
private MemoryCache cachedDependencies = new(new MemoryCacheOptions { SizeLimit = DEFAULTCACHEENTRIES }); | ||
|
||
public PyPiClient() => this.cacheTelemetry = new PypiCacheTelemetryRecord() | ||
{ | ||
|
@@ -84,54 +92,88 @@ public PyPiClient(IEnvironmentVariableService environmentVariableService, ILogge | |
|
||
public static HttpClient HttpClient { get; internal set; } = new HttpClient(HttpClientHandler); | ||
|
||
public async Task<IList<PipDependencySpecification>> FetchPackageDependenciesAsync(string name, string version, PythonProjectRelease release) | ||
private static async Task<HttpResponseMessage> SendGetRequestAsync( | ||
Uri uri, | ||
HttpCompletionOption completionOption = HttpCompletionOption.ResponseContentRead) | ||
{ | ||
var dependencies = new List<PipDependencySpecification>(); | ||
using var request = new HttpRequestMessage(HttpMethod.Get, uri); | ||
request.Headers.UserAgent.Add(ProductValue); | ||
request.Headers.UserAgent.Add(CommentValue); | ||
var response = await HttpClient.SendAsync(request, completionOption); | ||
return response; | ||
} | ||
|
||
public async Task<IList<PipDependencySpecification>> FetchPackageDependenciesAsync(string name, string version, PythonProjectRelease release) | ||
{ | ||
var uri = release.Url; | ||
var response = await this.GetAndCachePyPiResponseAsync(uri); | ||
var key = new CacheDependencyKey(name, version, uri); | ||
|
||
if (!response.IsSuccessStatusCode) | ||
return (await this.cachedDependencies.GetOrCreateAsync(key, FetchDependencies)).ToList(); | ||
|
||
async Task<List<PipDependencySpecification>> FetchDependencies(ICacheEntry cacheEntry) | ||
{ | ||
this.logger.LogWarning("Http GET at {ReleaseUrl} failed with status code {ResponseStatusCode}", release.Url, response.StatusCode); | ||
return dependencies; | ||
} | ||
cacheEntry.SlidingExpiration = CacheSlidingExpiration; | ||
cacheEntry.Size = 1; | ||
using var response = await SendGetRequestAsync(uri, HttpCompletionOption.ResponseHeadersRead); | ||
|
||
var package = new ZipArchive(await response.Content.ReadAsStreamAsync()); | ||
if (!response.IsSuccessStatusCode) | ||
{ | ||
this.logger.LogWarning( | ||
"Http GET at {ReleaseUrl} failed with status code {ResponseStatusCode}", | ||
release.Url, | ||
response.StatusCode); | ||
return new List<PipDependencySpecification>(); | ||
} | ||
|
||
var entry = package.GetEntry($"{name.Replace('-', '_')}-{version}.dist-info/METADATA"); | ||
// Store the .whl file on the disk temporarily | ||
var tempFilePath = Path.GetTempFileName(); | ||
await using (var fileStreamWrite = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this needed? I know these are temp files, but depending on how big the file can be, couldn't writing to disk cause errors for systems that are particularly low on disk space? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We encountered the opposite problem very often. We had enough disk space, but got OOM when buffering (and caching) large .whl files into memory in the HTTP Perhaps we can make this behavior configurable, so that consumers will be able to select whether to use disk storage for the downloads? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can start by adding some telemetry on the size of some wheels. I am a bit worried that we would eat up all the disk space downloading the wheels to disk, as technically we'd download them twice in a given CI build ( A slider or a configurable limit on either:
sounds like the best options. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about we have 2 options:
Would that be fine? It would leave current behavior unchanged, but give an option to use disk space instead of process memory for those who want it. A more fine-grained control like you suggested can be added at a later point if necessary. We can set this with an environment variable (or maybe even an |
||
new FileStream(tempFilePath, FileMode.Create, FileAccess.Write, FileShare.None)) | ||
{ | ||
await (await response.Content.ReadAsStreamAsync()).CopyToAsync(fileStreamWrite); | ||
} | ||
|
||
// If there is no metadata file, the package doesn't have any declared dependencies | ||
if (entry == null) | ||
{ | ||
return dependencies; | ||
} | ||
var content = new List<string>(); | ||
await using (var fileStreamRead = | ||
new FileStream(tempFilePath, FileMode.Open, FileAccess.Read, FileShare.None)) | ||
{ | ||
using var package = new ZipArchive(fileStreamRead); | ||
|
||
var content = new List<string>(); | ||
using (var stream = entry.Open()) | ||
{ | ||
using var streamReader = new StreamReader(stream); | ||
var entry = package.GetEntry($"{name.Replace('-', '_')}-{version}.dist-info/METADATA"); | ||
|
||
while (!streamReader.EndOfStream) | ||
{ | ||
var line = await streamReader.ReadLineAsync(); | ||
// If there is no metadata file, the package doesn't have any declared dependencies | ||
if (entry == null) | ||
{ | ||
return new List<PipDependencySpecification>(); | ||
} | ||
|
||
await using var stream = entry.Open(); | ||
using var streamReader = new StreamReader(stream); | ||
|
||
if (PipDependencySpecification.RequiresDistRegex.IsMatch(line)) | ||
while (!streamReader.EndOfStream) | ||
{ | ||
content.Add(line); | ||
var line = await streamReader.ReadLineAsync(); | ||
if (line is null) | ||
{ | ||
continue; | ||
} | ||
|
||
if (PipDependencySpecification.RequiresDistRegex.IsMatch(line)) | ||
{ | ||
content.Add(line); | ||
} | ||
} | ||
} | ||
} | ||
|
||
// Pull the packages that aren't conditional based on "extras" | ||
// Right now we just want to resolve the graph as most comsumers will | ||
// experience it | ||
foreach (var deps in content.Where(x => !x.Contains("extra =="))) | ||
{ | ||
dependencies.Add(new PipDependencySpecification(deps, true)); | ||
} | ||
File.Delete(tempFilePath); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if this fails? The detector can run in parallel and potentially cause lockups. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code uses a different temporary path every time ( |
||
|
||
return dependencies; | ||
// Pull the packages that aren't conditional based on "extras" | ||
// Right now we just want to resolve the graph as most consumers will | ||
// experience it | ||
return content.Where(x => !x.Contains("extra ==")) | ||
.Select(deps => new PipDependencySpecification(deps, true)) | ||
.ToList(); | ||
} | ||
} | ||
|
||
public async Task<SortedDictionary<string, IList<PythonProjectRelease>>> GetReleasesAsync(PipDependencySpecification spec) | ||
|
@@ -247,15 +289,13 @@ private async Task<HttpResponseMessage> GetAndCachePyPiResponseAsync(Uri uri) | |
} | ||
|
||
this.logger.LogInformation("Getting Python data from {Uri}", uri); | ||
using var request = new HttpRequestMessage(HttpMethod.Get, uri); | ||
request.Headers.UserAgent.Add(ProductValue); | ||
request.Headers.UserAgent.Add(CommentValue); | ||
var response = await HttpClient.SendAsync(request); | ||
var response = await SendGetRequestAsync(uri); | ||
|
||
// The `first - wins` response accepted into the cache. This might be different from the input if another caller wins the race. | ||
return await this.cachedResponses.GetOrCreateAsync(uri, cacheEntry => | ||
{ | ||
cacheEntry.SlidingExpiration = TimeSpan.FromSeconds(CACHEINTERVALSECONDS); // This entry will expire after CACHEINTERVALSECONDS seconds from last use | ||
cacheEntry.SlidingExpiration = | ||
CacheSlidingExpiration; // This entry will expire after CACHEINTERVALSECONDS seconds from last use | ||
cacheEntry.Size = 1; // Specify a size of 1 so a set number of entries can always be in the cache | ||
return Task.FromResult(response); | ||
}); | ||
|
@@ -272,6 +312,7 @@ private void InitializeNonDefaultMemoryCache() | |
{ | ||
this.logger.LogInformation("Setting IPyPiClient max cache entries to {MaxEntries}", maxEntries); | ||
this.cachedResponses = new MemoryCache(new MemoryCacheOptions { SizeLimit = maxEntries }); | ||
this.cachedDependencies = new MemoryCache(new MemoryCacheOptions { SizeLimit = maxEntries }); | ||
} | ||
|
||
this.checkedMaxEntriesVariable = true; | ||
|
@@ -282,5 +323,8 @@ public void Dispose() | |
this.cacheTelemetry.FinalCacheSize = this.cachedResponses.Count; | ||
this.cacheTelemetry.Dispose(); | ||
this.cachedResponses.Dispose(); | ||
this.cachedDependencies.Dispose(); | ||
} | ||
|
||
private record CacheDependencyKey(string Name, string Version, Uri Uri); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: .ToList might be redundant since the task result is already a list and it eventually gets converted to dictionary in pythonresolver.