Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix for PostProcess Stats job #10282

Open
wants to merge 13 commits into
base: dev
Choose a base branch
from
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
"rollForward": "latestFeature",
"allowPrerelease": false
}
}
}
7 changes: 5 additions & 2 deletions src/NuGet.Services.Storage/AggregateStorage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
Expand Down Expand Up @@ -111,9 +111,12 @@ public override async Task<IEnumerable<StorageListItem>> ListAsync(bool getMetad
return await _primaryStorage.ListAsync(getMetadata, cancellationToken);
}

public override async Task<IEnumerable<StorageListItem>> ListTopLevelAsync(bool getMetadata, CancellationToken cancellationToken) =>
await _primaryStorage.ListTopLevelAsync(getMetadata, cancellationToken);

public override Task SetMetadataAsync(Uri resourceUri, IDictionary<string, string> metadata)
{
throw new NotImplementedException();
}
}
}
}
24 changes: 23 additions & 1 deletion src/NuGet.Services.Storage/AzureStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,29 @@ public override async Task<IEnumerable<StorageListItem>> ListAsync(bool getMetad

await foreach (BlobHierarchyItem blob in _directory.GetBlobsByHierarchyAsync(traits: blobTraits, prefix: _path))
{
blobList.Add(await GetStorageListItemAsync(_directory.GetBlockBlobClient(blob.Blob.Name)));
blobList.Add(await GetStorageListItemAsync(_directory.GetBlockBlobClient(blob.Blob.Name)));
}

return blobList;
}

public override async Task<IEnumerable<StorageListItem>> ListTopLevelAsync(bool getMetadata, CancellationToken cancellationToken)
{
var prefix = _path.Trim('/') + '/';
var blobTraits = new BlobTraits();
if (getMetadata)
{
blobTraits |= BlobTraits.Metadata;
}

var blobList = new List<StorageListItem>();

await foreach (BlobHierarchyItem blob in _directory.GetBlobsByHierarchyAsync(traits: blobTraits, prefix: prefix, delimiter: "/"))
{
if (!blob.IsPrefix)
{
blobList.Add(await GetStorageListItemAsync(_directory.GetBlockBlobClient(blob.Blob.Name)));
}
}

return blobList;
Expand Down
7 changes: 6 additions & 1 deletion src/NuGet.Services.Storage/FileStorage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
Expand Down Expand Up @@ -57,6 +57,11 @@ public override Task<IEnumerable<StorageListItem>> ListAsync(bool getMetadata, C
return Task.FromResult<IEnumerable<StorageListItem>>(List(getMetadata));
}

public override Task<IEnumerable<StorageListItem>> ListTopLevelAsync(bool getMetadata, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

public string Path
{
get;
Expand Down
9 changes: 8 additions & 1 deletion src/NuGet.Services.Storage/IStorage.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -19,7 +20,13 @@ public interface IStorage
Uri BaseAddress { get; }
Uri ResolveUri(string relativeUri);
IEnumerable<StorageListItem> List(bool getMetadata);

//Lists all children of the storage(including the ones contained in subdirectories).
Copy link
Contributor

@agr agr Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, use the documentation comment syntax

/// <summary>
/// ...
/// </summary>

Task<IEnumerable<StorageListItem>> ListAsync(bool getMetadata, CancellationToken cancellationToken);

//Lists immediate children of the storage assuming directory-like structure
Task<IEnumerable<StorageListItem>> ListTopLevelAsync(bool getMetadata, CancellationToken cancellationToken);
agr marked this conversation as resolved.
Show resolved Hide resolved

Task CopyAsync(
Uri sourceUri,
IStorage destinationStorage,
Expand Down
7 changes: 7 additions & 0 deletions src/NuGet.Services.Storage/Storage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public async Task<string> LoadString(Uri resourceUri, CancellationToken cancella
public abstract Task<bool> ExistsAsync(string fileName, CancellationToken cancellationToken);
public abstract IEnumerable<StorageListItem> List(bool getMetadata);
public abstract Task<IEnumerable<StorageListItem>> ListAsync(bool getMetadata, CancellationToken cancellationToken);
public abstract Task<IEnumerable<StorageListItem>> ListTopLevelAsync(bool getMetadata, CancellationToken cancellationToken);

public bool Verbose
{
Expand Down Expand Up @@ -209,6 +210,12 @@ protected string GetName(Uri uri)
{
name = name.Substring(0, name.IndexOf("#"));
}

if (name.Contains("?"))
Copy link

@jimmylewis jimmylewis Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the query (?) comes before the fragment (#), so it would be better to do this check first. That way you don't allocate a substring twice if both exist.

[edit:] With reordering these, you can also make the second check an else if because if you find a ? you no longer need to check for #

Alternatively, you could do a single pass with IndexOfAny, which would also save a second iteration over the string.

{
name = name.Substring(0, name.IndexOf("?"));
}

return name;
}

Expand Down
21 changes: 11 additions & 10 deletions src/Stats.PostProcessReports/DetailedReportPostProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
Expand All @@ -10,6 +10,7 @@
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
Expand Down Expand Up @@ -120,7 +121,7 @@ private async Task ProcessBlobs(List<StorageListItem> jsonBlobs, CancellationTok
foreach (var sourceBlob in jsonBlobs)
{
var blobName = GetBlobName(sourceBlob);
var workBlobUri = _workStorage.ResolveUri(blobName);
var workBlobUri = _workStorage.ResolveUri(_configuration.WorkPath + blobName);
var sourceBlobStats = new BlobStatistics();
var individualReports = await ProcessSourceBlobAsync(sourceBlob, sourceBlobStats, totals);
using (_logger.BeginScope("Processing {BlobName}", blobName))
Expand Down Expand Up @@ -154,7 +155,7 @@ private async Task ProcessBlobs(List<StorageListItem> jsonBlobs, CancellationTok
}
}
}
var jobSucceededUrl = _workStorage.ResolveUri(JobSucceededFilename);
var jobSucceededUrl = _workStorage.ResolveUri(_configuration.WorkPath + JobSucceededFilename);
var jobSucceededContent = new StringStorageContent("", TextContentType);
await _workStorage.Save(jobSucceededUrl, jobSucceededContent, overwrite: true, cancellationToken: cancellationToken);
_telemetryService.ReportTotals(totals.SourceFilesProcessed, totals.TotalLinesProcessed, totals.TotalFilesCreated, totals.TotalLinesFailed);
Expand Down Expand Up @@ -202,26 +203,26 @@ private async Task CopySourceBlobsAsync(List<StorageListItem> jsonBlobs, Cancell
foreach (var sourceBlob in jsonBlobs)
{
var blobName = GetBlobName(sourceBlob);
var targetUrl = _workStorage.ResolveUri(blobName);
var targetUrl = _workStorage.ResolveUri(_configuration.WorkPath + blobName);
_logger.LogInformation("{SourceBlobUri} ({BlobName})", sourceBlob.Uri.AbsoluteUri, blobName);
_logger.LogInformation("{WorkBlobUrl}", targetUrl);
await _sourceStorage.CopyAsync(sourceBlob.Uri, _workStorage, targetUrl, destinationProperties: null, cancellationToken);
}
var copySucceededContent = new StringStorageContent("", TextContentType);
var copySucceededUrl = _workStorage.ResolveUri(CopySucceededFilename);
var copySucceededUrl = _workStorage.ResolveUri(_configuration.WorkPath + CopySucceededFilename);
await _workStorage.Save(copySucceededUrl, copySucceededContent, overwrite: true, cancellationToken: cancellationToken);
}

private async Task<List<StorageListItem>> EnumerateSourceBlobsAsync()
{
var blobs = await _sourceStorage.ListAsync(getMetadata: true, cancellationToken: CancellationToken.None);
var blobs = await _sourceStorage.ListTopLevelAsync(getMetadata: true, cancellationToken: CancellationToken.None);

return blobs.ToList();
}

private async Task<List<StorageListItem>> EnumerateWorkStorageBlobsAsync()
{
var blobs = await _workStorage.ListAsync(getMetadata: true, cancellationToken: CancellationToken.None);
var blobs = await _workStorage.ListTopLevelAsync(getMetadata: true, cancellationToken: CancellationToken.None);

return blobs.ToList();
}
Expand All @@ -245,7 +246,7 @@ private async Task<ConcurrentBag<LineProcessingContext>> ProcessSourceBlobAsync(
var sw = Stopwatch.StartNew();
var numLines = 0;
var individualReports = new ConcurrentBag<LineProcessingContext>();
var workStorageUrl = _workStorage.ResolveUri(GetBlobName(sourceBlob));
var workStorageUrl = _workStorage.ResolveUri(_configuration.WorkPath + GetBlobName(sourceBlob));
var storageContent = await _workStorage.Load(workStorageUrl, CancellationToken.None);
using (var sourceStream = storageContent.GetContentStream())
using (var streamReader = new StreamReader(sourceStream))
Expand Down Expand Up @@ -311,7 +312,7 @@ private static bool BlobMetadataExists(StorageListItem sourceBlob, TotalStats to

private static string GetBlobName(StorageListItem blob)
{
var path = blob.Uri.AbsoluteUri;
var path = blob.Uri.GetComponents(UriComponents.Path, UriFormat.UriEscaped);
var lastSlash = path.LastIndexOf('/');
if (lastSlash < 0)
{
Expand Down Expand Up @@ -351,7 +352,7 @@ private async Task WriteReports(
continue;
}
var outFilename = $"recentpopularitydetail_{data.PackageId.ToLowerInvariant()}.json";
var destinationUri = _destinationStorage.ResolveUri(outFilename);
var destinationUri = _destinationStorage.ResolveUri(_configuration.DestinationPath + outFilename);
var storageContent = new StringStorageContent(details.Data, JsonContentType);

await _destinationStorage.Save(destinationUri, storageContent, overwrite: true, cancellationToken: cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ public override Task<IEnumerable<StorageListItem>> ListAsync(bool getMetadata, C
return _storage.ListAsync(getMetadata, cancellationToken);
}

public override Task<IEnumerable<StorageListItem>> ListTopLevelAsync(bool getMetadata, CancellationToken cancellationToken)
{
return _storage.ListTopLevelAsync(getMetadata, cancellationToken);
}

public override Task SetMetadataAsync(Uri resourceUri, IDictionary<string, string> metadata)
{
return _storage.SetMetadataAsync(resourceUri, metadata);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
Expand Down Expand Up @@ -33,13 +33,13 @@ public class DetailedReportPostProcessorFacts
public async Task DoesntStartIfNoSuccessFile()
{
_sourceStorageMock
.Setup(ss => ss.ListAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Setup(ss => ss.ListTopLevelAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new List<StorageListItem>());

await _target.CopyReportsAsync();

_sourceStorageMock
.Verify(ss => ss.ListAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Once);
.Verify(ss => ss.ListTopLevelAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()), Times.Once);
_sourceStorageMock.VerifyNoOtherCalls();
_workStorageMock.VerifyNoOtherCalls();
_destinationStorageMock.VerifyNoOtherCalls();
Expand Down Expand Up @@ -186,7 +186,7 @@ public async Task SkipsProcessedFiles()
{ "FilesCreated", "123" }
};
_workStorageMock
.Setup(ss => ss.ListAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Setup(ss => ss.ListTopLevelAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(() => new List<StorageListItem>(_workFiles.Select(f => Blob(
_workStorageMock,
f,
Expand Down Expand Up @@ -286,7 +286,7 @@ private static void SetupStorageMock(Mock<IStorage> mock, string baseUrl, Func<L
return files().Contains(filename);
});
mock
.Setup(s => s.ListAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.Setup(s => s.ListTopLevelAsync(It.IsAny<bool>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(() => new List<StorageListItem>(files().Select(f => Blob(mock, f))));
mock
.Setup(s => s.ResolveUri(It.IsAny<string>()))
Expand Down